У меня есть специальный ридер для Spark Streaming, который читает данные из WebSocket. Я собираюсь попробовать Spark Structured Streaming.
Как создать источник потоковых данных в Spark Structured Streaming?
У меня есть специальный ридер для Spark Streaming, который читает данные из WebSocket. Я собираюсь попробовать Spark Structured Streaming.
Как создать источник потоковых данных в Spark Structured Streaming?
Источник потоковых данных реализует org.apache.spark.sql.execution.streaming.Source.
Scalaadoc org.apache.spark.sql.execution.streaming.Source
должен дать вам достаточно информации, чтобы начать работу (просто следуйте типам, чтобы разработать компилируемый тип Scala).
Получив Source
, вы должны зарегистрировать его, чтобы использовать его в format
из DataStreamReader
. Уловка, позволяющая сделать источник потоковой передачи доступным, чтобы вы могли использовать его для format
, состоит в том, чтобы зарегистрировать его, создав DataSourceRegister
для источника потоковой передачи. Вы можете найти примеры в META-INF / services / org.apache.spark.sql.sources.DataSourceRegister:
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
Это файл, который связывает короткое имя в format
с реализацией.
Во время семинаров по Spark я обычно рекомендую людям начинать разработку с обеих сторон:
Напишите потоковый запрос (с format
), например
val input = spark
.readStream
.format("yourCustomSource") // <-- your custom source here
.load
Реализуйте потоковую передачу Source
и соответствующий DataSourceRegister
(это может быть один и тот же класс)
(необязательно) Зарегистрируйте DataSourceRegister
, записав полное имя класса, скажем com.mycompany.spark.MyDataSourceRegister
, в META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
:
$ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
com.mycompany.spark.MyDataSourceRegister
Последний шаг, на котором вы регистрируете реализацию DataSourceRegister
для своего пользовательского Source
, является необязательным и предназначен только для регистрации псевдонима источника данных, который ваши конечные пользователи используют в DataFrameReader.format.
format (source: String): DataFrameReader Определяет формат источника входных данных.
Просмотрите код org.apache.spark.sql.execution.streaming.RateSourceProvider для хорошего старта.
org.apache.spark.sql.execution.streaming.FileStreamSource
зарегистрирован?
- person lfk; 12.07.2018
TextFileFormat
, например, реализует DataSourceRegister
, но не Source
. Фактический Source
создается DataSource.createSource
и FileStreamSource
для всех экземпляров FileFormat
(строка 265 DataSource.scala, spark-sql_2.11). Я могу реализовать DataSourceRegister
, но до сих пор не понимаю, как использовать собственный экземпляр Source
.
- person lfk; 13.07.2018
TextFileFormat
отвечает за потоковую передачу наборов данных для текстовых файлов? Он используется только для пакетных / непотоковых наборов данных. Посмотрите на FileStreamSource
(и DataSource.createSource
, который его регистрирует). Почему бы вам не реализовать Source
с DataSourceRegister
?
- person Jacek Laskowski; 15.07.2018
Source
и DataSourceRegister
. Код не работает с java.lang.UnsupportedOperationException: Data source test does not support streamed reading
. Отметьте DataSource.scala:275
. Необходимо реализовать StreamSourceProvider
, а не Source
. Это, конечно, для V1. Источники данных V2 используют другой кодовый путь.
- person lfk; 16.07.2018
Поскольку Spark переходит на API V2, теперь вам необходимо реализовать DataSourceV2, MicroBatchReadSupport и DataSourceRegister.
Это потребует создания вашей собственной реализации Offset
, MicroBatchReader
, DataReader<Row>
и DataReaderFactory<Row>
.
В Интернете (на Scala) есть несколько примеров пользовательских структурированных потоковых потоков, которые помогли мне в написании моя.
После того, как вы внедрили свой собственный источник, вы можете следовать ответу Яцека Ласковски при регистрации источника.
Кроме того, в зависимости от кодировки сообщений, которые вы будете получать из сокета, вы можете просто использовать источник сокета по умолчанию и использовать настраиваемую функцию карты для синтаксического анализа информации в любые Beans, которые вы будете использовать. Однако обратите внимание, что Spark говорит, что источник потоковой передачи сокетов по умолчанию не должен использоваться в производстве!
Надеюсь это поможет!
Также Здесь представлен образец реализации пользовательского WebSocket Stream Reader / Writer, который реализует Offset, MicroBatchReader, DataReader<Row>
, и DataReaderFactory<Row>