Как сохранить потоковые данные искры в Hdfs в Hortonworks?

Я передал данные из тем Kafka, используя Spark. Это код, который я пробовал. Здесь я просто отображал потоковые данные в консоли. Я хочу сохранить эти данные в виде текстового файла в HDFS.

import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
object StreamingDataNew {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaConf = Map(
      "metadata.broker.list" -> "localhost:9092",
      "zookeeper.connect" -> "localhost:2181",
      "group.id" -> "kafka-streaming-example",
      "zookeeper.connection.timeout.ms" -> "200000"
    )
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
      ssc,
      kafkaConf,
      Map("topic-one" -> 1), // subscripe to topic and partition 1
      StorageLevel.MEMORY_ONLY
    )
    println("printing" + lines.toString())
    val words = lines.flatMap { case (x, y) => y.split(" ") }
    words.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

Я обнаружил, что мы можем написать DStream, используя saveAsTextFiles. Но может ли кто-нибудь четко указать шаги по подключению к Hortonworks и сохранению в HDFS с использованием приведенного выше кода scala.


person Amr916    schedule 22.05.2018    source источник
comment
Возможный дубликат Spark Streaming From Kafka и Write to HDFS в формате Avro   -  person Sivaprasanna Sethuraman    schedule 22.05.2018
comment
@SivaprasannaSethuraman Спасибо.   -  person Amr916    schedule 22.05.2018
comment
@SivaprasannaSethuraman Я попробовал возможные решения в приведенной выше ссылке на вопрос, которой вы поделились. Это не сработало для меня.   -  person Amr916    schedule 22.05.2018
comment
Просто говоря, что это не работает для меня, нам мало что поможет. Подскажите, пожалуйста, с какой проблемой вы столкнулись?   -  person Sivaprasanna Sethuraman    schedule 22.05.2018
comment
@SivaprasannaSethuraman Я попытался сохранить данные, передаваемые в виде текстовых файлов, на локальном диске. stream.map(_.value).foreachRDD( rdd =› { rdd.foreach(println) if (!rdd.isEmpty()) { rdd.saveAsTextFile(C:/data/spark/) } }) Форматы файлов указаны в формат crc. Так же пришли файлы на общее количество разделов но я не могу просмотреть потоковое содержимое.   -  person Amr916    schedule 22.05.2018
comment
CRC существуют для контрольной суммы. rdd.foreach(println) ничего не показывает?   -  person Sivaprasanna Sethuraman    schedule 22.05.2018
comment
@Sivaprasanna Sethuraman Я вижу сообщения, когда пытаюсь открыть файл. Но проблема в том, что я получаю новые файлы для каждого раздела.   -  person Amr916    schedule 22.05.2018
comment
Вот как работает Spark, если вы хотите меньше файлов, вы можете переразбить (исходя из требуемого параллелизма)   -  person Sivaprasanna Sethuraman    schedule 22.05.2018


Ответы (1)


Я нашел ответ, этот код сработал для меня.

package com.spark.streaming

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object MessageStreaming {
  def main(args: Array[String]): Unit = {
    println("Message streaming")

    val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
    val context = new SparkContext(conf)
    val ssc = new StreamingContext(context, org.apache.spark.streaming.Seconds(10))
    val kafkaParams = Map(
      "bootstrap.servers" -> "kafka.kafka-cluster.com:9092",
      "group.id" -> "kafka-streaming-example",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "latest",
      "zookeeper.connection.timeout.ms" -> "200000"
    )
    val topics = Array("cdc-classic")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))

    val content = stream.filter(x => x.value() != null)
    val sqlContext = new org.apache.spark.sql.SQLContext(context)
    import sqlContext.implicits._

    stream.map(_.value).foreachRDD(rdd => {

      rdd.foreach(println)
      if (!rdd.isEmpty()) {
     rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("hdfs://dev1a/user/hg5tv0/hadoop/MessagesFromKafka")


      }

    })
    ssc.start()
    ssc.awaitTermination

}}
person Amr916    schedule 01.06.2018