Как создать собственный источник потоковых данных?

У меня есть специальный ридер для Spark Streaming, который читает данные из WebSocket. Я собираюсь попробовать Spark Structured Streaming.

Как создать источник потоковых данных в Spark Structured Streaming?


person szu    schedule 02.12.2017    source источник


Ответы (3)


Источник потоковых данных реализует org.apache.spark.sql.execution.streaming.Source.

Scalaadoc org.apache.spark.sql.execution.streaming.Source должен дать вам достаточно информации, чтобы начать работу (просто следуйте типам, чтобы разработать компилируемый тип Scala).

Получив Source, вы должны зарегистрировать его, чтобы использовать его в format из DataStreamReader. Уловка, позволяющая сделать источник потоковой передачи доступным, чтобы вы могли использовать его для format, состоит в том, чтобы зарегистрировать его, создав DataSourceRegister для источника потоковой передачи. Вы можете найти примеры в META-INF / services / org.apache.spark.sql.sources.DataSourceRegister:

org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider

Это файл, который связывает короткое имя в format с реализацией.

Во время семинаров по Spark я обычно рекомендую людям начинать разработку с обеих сторон:

  1. Напишите потоковый запрос (с format), например

    val input = spark
      .readStream
      .format("yourCustomSource") // <-- your custom source here
      .load
    
  2. Реализуйте потоковую передачу Source и соответствующий DataSourceRegister (это может быть один и тот же класс)

  3. (необязательно) Зарегистрируйте DataSourceRegister, записав полное имя класса, скажем com.mycompany.spark.MyDataSourceRegister, в META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

    $ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
    com.mycompany.spark.MyDataSourceRegister
    

Последний шаг, на котором вы регистрируете реализацию DataSourceRegister для своего пользовательского Source, является необязательным и предназначен только для регистрации псевдонима источника данных, который ваши конечные пользователи используют в DataFrameReader.format.

format (source: String): DataFrameReader Определяет формат источника входных данных.

Просмотрите код org.apache.spark.sql.execution.streaming.RateSourceProvider для хорошего старта.

person Jacek Laskowski    schedule 02.12.2017
comment
Где org.apache.spark.sql.execution.streaming.FileStreamSource зарегистрирован? - person lfk; 12.07.2018
comment
@lfk Добавлены дополнительные пояснения к шагу регистрации. Дайте мне знать, если все еще непонятно. Спасибо. - person Jacek Laskowski; 13.07.2018
comment
TextFileFormat, например, реализует DataSourceRegister, но не Source. Фактический Source создается DataSource.createSource и FileStreamSource для всех экземпляров FileFormat (строка 265 DataSource.scala, spark-sql_2.11). Я могу реализовать DataSourceRegister, но до сих пор не понимаю, как использовать собственный экземпляр Source. - person lfk; 13.07.2018
comment
Как вы думаете, почему TextFileFormat отвечает за потоковую передачу наборов данных для текстовых файлов? Он используется только для пакетных / непотоковых наборов данных. Посмотрите на FileStreamSourceDataSource.createSource, который его регистрирует). Почему бы вам не реализовать Source с DataSourceRegister? - person Jacek Laskowski; 15.07.2018
comment
Я пробовал реализовать Source и DataSourceRegister. Код не работает с java.lang.UnsupportedOperationException: Data source test does not support streamed reading. Отметьте DataSource.scala:275. Необходимо реализовать StreamSourceProvider, а не Source. Это, конечно, для V1. Источники данных V2 используют другой кодовый путь. - person lfk; 16.07.2018

Поскольку Spark переходит на API V2, теперь вам необходимо реализовать DataSourceV2, MicroBatchReadSupport и DataSourceRegister.

Это потребует создания вашей собственной реализации Offset, MicroBatchReader, DataReader<Row> и DataReaderFactory<Row>.

В Интернете (на Scala) есть несколько примеров пользовательских структурированных потоковых потоков, которые помогли мне в написании моя.

После того, как вы внедрили свой собственный источник, вы можете следовать ответу Яцека Ласковски при регистрации источника.

Кроме того, в зависимости от кодировки сообщений, которые вы будете получать из сокета, вы можете просто использовать источник сокета по умолчанию и использовать настраиваемую функцию карты для синтаксического анализа информации в любые Beans, которые вы будете использовать. Однако обратите внимание, что Spark говорит, что источник потоковой передачи сокетов по умолчанию не должен использоваться в производстве!

Надеюсь это поможет!

person alz2    schedule 01.06.2018

Также Здесь представлен образец реализации пользовательского WebSocket Stream Reader / Writer, который реализует Offset, MicroBatchReader, DataReader<Row>, и DataReaderFactory<Row>

person dumitru    schedule 20.11.2019