Как записать набор потоковых данных в Kafka?

Я пытаюсь немного обогатить данные тем. Поэтому читайте из Kafka обратно в Kafka, используя структурированную потоковую передачу Spark.

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

Но я получаю исключение:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Любые обходные пути?


person ppanero    schedule 24.03.2017    source источник


Ответы (3)


Как T. Гавенда упомянул, что не существует формата kafka для записи потоковых наборов данных в Kafka (то есть в приемник Kafka).

В настоящее время рекомендуемым решением в Spark 2.1 является использование оператор foreach.

Операция foreach позволяет выполнять произвольные операции над выходными данными. Начиная со Spark 2.1 это доступно только для Scala и Java. Чтобы использовать это, вам нужно будет реализовать интерфейс ForeachWriter (документация Scala/Java), у которого есть методы, которые вызываются всякий раз, когда есть последовательность строк, сгенерированных в качестве вывода после триггера. Обратите внимание на следующие важные моменты.

person Jacek Laskowski    schedule 24.03.2017
comment
Хотелось бы увидеть реализацию. Был бы признателен за суть, как только вы закончите. Спасибо! - person Jacek Laskowski; 27.03.2017
comment
я использую версию spark 2.2.snapshot из ночных сборок, я сослался на maven, добавив репозитории моментальных снимков apache, теперь я протестирую синхронизацию kafka, а позже, если хотите, я могу поделиться опытом =) - person Danilow; 21.06.2017

В Spark 2.1 (который в настоящее время является последней версией Spark) его нет. В следующем выпуске — 2.2 — будет Kafka Writer, см. этот коммит.

Kafka Sink — это то же самое, что и Kafka Writer.

person T. Gawęda    schedule 24.03.2017

Попробуй это

ds.map(_.toString.getBytes).toDF("value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092"))
      .option("topic", topic)
      .start
      .awaitTermination()
person Ayush Hooda    schedule 07.02.2019
comment
объясните больше о своем решении, поймите его намного лучше - person TheParam; 07.02.2019
comment
Вам просто нужно преобразовать набор данных обратно в фреймворк данных со значением имени столбца, поскольку kafka требует пары ключ-значение, в которой значение является обязательным полем. - person Ayush Hooda; 09.02.2019