Как вывести записи из Kafka на консоль?

Я изучаю структурированную потоковую передачу, и мне не удалось отобразить вывод на консоль.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime

object kafka_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("kafka-consumer")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    spark.sparkContext.setLogLevel("WARN")

//    val schema = StructType().add("a", IntegerType()).add("b", StringType())

    val schema = StructType(Seq(
      StructField("a", IntegerType, true),
      StructField("b", StringType, true)
    ))

    val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "172.21.0.187:9093")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()


    val values = df.selectExpr("CAST(value AS STRING)").as[String]

    values.writeStream
      .outputMode("append")
      .format("console")
      .start()
      .awaitTermination()


  }


}

Мой вклад в Кафку

my name is abc how are you ?

Я просто хочу отображать строки из Kafka в искровую консоль.


person HIREN GALA    schedule 20.07.2017    source источник
comment
Почему вы используете .outputMode("append")? Какая-то конкретная причина? Можете ли вы показать, как вы отправляете сообщение Кафке? производитель кафки-консоли? Вы уверены, что используете тему test? Что такое 9093?! Я думаю, что это должно быть 9092.   -  person Jacek Laskowski    schedule 20.07.2017
comment
Как я уже упоминал в вопросе, мой ввод в kafka - это то, как я отправляю ввод в kafka, и я использую добавление, потому что мне нужен последний вывод   -  person HIREN GALA    schedule 20.07.2017
comment
Насколько я понимаю, мой вклад в kafka — это то, что вы отправляете в Kafka, а не то, как вы это делаете. То, как вы отправляете, может помочь нам понять, почему вы не видите вывод кода. Append используется по умолчанию, поэтому нет смысла настраивать его явно (если вы не знаете, почему вы это делаете). Удалите ненужные вещи, чтобы мы очистили код и, надеюсь, заставили его работать.   -  person Jacek Laskowski    schedule 20.07.2017
comment
Похоже, это просто потому, что вы ввели неправильный адрес брокера. Потребитель Kafka будет пытаться вечно, если не сможет подключиться к брокеру.   -  person zsxwing    schedule 20.07.2017
comment
Нет, он правильно подключается, потому что, когда я делаю df.streaming(), он печатает true   -  person HIREN GALA    schedule 21.07.2017
comment
df.streaming ()? Не могли бы вы уточнить это? streaming метода нет.   -  person zsxwing    schedule 24.07.2017
comment
Если вы имели в виду isStreaming, то результат ничего не значит, за исключением того, что ваш DataFrame является потоковым DataFrame, и Spark не подключится к Kafka, пока вы не вызовете start().   -  person zsxwing    schedule 24.07.2017
comment
Но я уже вызываю метод start() в своем коде.   -  person HIREN GALA    schedule 25.07.2017