Я изучаю структурированную потоковую передачу, и мне не удалось отобразить вывод на консоль.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime
object kafka_stream {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("kafka-consumer")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
// val schema = StructType().add("a", IntegerType()).add("b", StringType())
val schema = StructType(Seq(
StructField("a", IntegerType, true),
StructField("b", StringType, true)
))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.21.0.187:9093")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
val values = df.selectExpr("CAST(value AS STRING)").as[String]
values.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
}
}
Мой вклад в Кафку
my name is abc how are you ?
Я просто хочу отображать строки из Kafka в искровую консоль.
.outputMode("append")
? Какая-то конкретная причина? Можете ли вы показать, как вы отправляете сообщение Кафке? производитель кафки-консоли? Вы уверены, что используете темуtest
? Что такое 9093?! Я думаю, что это должно быть 9092. - person Jacek Laskowski   schedule 20.07.2017streaming
метода нет. - person zsxwing   schedule 24.07.2017isStreaming
, то результат ничего не значит, за исключением того, что ваш DataFrame является потоковым DataFrame, и Spark не подключится к Kafka, пока вы не вызоветеstart()
. - person zsxwing   schedule 24.07.2017