Я передал данные из тем Kafka, используя Spark. Это код, который я пробовал. Здесь я просто отображал потоковые данные в консоли. Я хочу сохранить эти данные в виде текстового файла в HDFS.
import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
object StreamingDataNew {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaConf = Map(
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "kafka-streaming-example",
"zookeeper.connection.timeout.ms" -> "200000"
)
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaConf,
Map("topic-one" -> 1), // subscripe to topic and partition 1
StorageLevel.MEMORY_ONLY
)
println("printing" + lines.toString())
val words = lines.flatMap { case (x, y) => y.split(" ") }
words.print()
ssc.start()
ssc.awaitTermination()
}
}
Я обнаружил, что мы можем написать DStream, используя saveAsTextFiles. Но может ли кто-нибудь четко указать шаги по подключению к Hortonworks и сохранению в HDFS с использованием приведенного выше кода scala.
rdd.foreach(println)
ничего не показывает? - person Sivaprasanna Sethuraman   schedule 22.05.2018