искра кафка кассандра не работает

мое потоковое приложение не сохраняет данные в cas, я пробовал разными способами использовать foreachRDD и stream.print, чтобы выяснить, почему он не работает, но он ничего не печатает.
Для входных данных я использую kafka-console-продюсер. ш

object letsRun extends App {
  import org.apache.spark.{SparkConf, SparkContext}
  import org.apache.spark._
  import com.datastax.spark.connector._
  import org.apache.spark.sql._
  import com.datastax.spark.connector.writer._
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.StreamingContext._
  import com.datastax.spark.connector.streaming._
  import org.apache.spark.streaming.kafka010._
  import org.apache.kafka.clients.consumer.ConsumerRecord
  import org.apache.kafka.common.serialization.StringDeserializer
  import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


  val conf = new SparkConf().setMaster("local[*]").setAppName("test").set("spark.cassandra.connection.host", "192.168.1.44")
  //val sc = new SparkContext(conf)
  val ssc = new StreamingContext(conf, Seconds(5))
  ssc.sparkContext.setLogLevel("WARN")

  val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.1.46:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
  )

  val topics = Set[String]("testTopic")
  val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )
  implicit val sqlRowWriter = SqlRowWriter.Factory

  val messages = stream.map(record => (record.key, record.value))
  messages.saveToCassandra("ks", "tb", SomeColumns("key", "value"))
  ssc.start()
  ssc.awaitTermination()
}

вывод в Eclipse:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/12 20:23:25 INFO SparkContext: Running Spark version 2.1.1
17/06/12 20:23:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/06/12 20:23:26 WARN Utils: Your hostname, dev resolves to a loopback address: 127.0.0.1; using 192.168.1.41 instead (on interface wlo1)
17/06/12 20:23:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/06/12 20:23:26 INFO SecurityManager: Changing view acls to: dev
17/06/12 20:23:26 INFO SecurityManager: Changing modify acls to: dev
17/06/12 20:23:26 INFO SecurityManager: Changing view acls groups to: 
17/06/12 20:23:26 INFO SecurityManager: Changing modify acls groups to: 
17/06/12 20:23:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(dev); groups with view permissions: Set(); users  with modify permissions: Set(dev); groups with modify permissions: Set()
17/06/12 20:23:26 INFO Utils: Successfully started service 'sparkDriver' on port 39585.
17/06/12 20:23:26 INFO SparkEnv: Registering MapOutputTracker
17/06/12 20:23:26 INFO SparkEnv: Registering BlockManagerMaster
17/06/12 20:23:26 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/06/12 20:23:26 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/06/12 20:23:26 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8eb85c8e-216f-4b69-a567-3e7833cd675d
17/06/12 20:23:26 INFO MemoryStore: MemoryStore started with capacity 870.9 MB
17/06/12 20:23:26 INFO SparkEnv: Registering OutputCommitCoordinator
17/06/12 20:23:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/06/12 20:23:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.41:4040
17/06/12 20:23:27 INFO Executor: Starting executor ID driver on host localhost
17/06/12 20:23:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43841.
17/06/12 20:23:27 INFO NettyBlockTransferService: Server created on 192.168.1.41:43841
17/06/12 20:23:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/06/12 20:23:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.41:43841 with 870.9 MB RAM, BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.41, 43841, None)
17/06/12 20:23:27 WARN KafkaUtils: overriding enable.auto.commit to false for executor
17/06/12 20:23:27 WARN KafkaUtils: overriding auto.offset.reset to none for executor
17/06/12 20:23:27 WARN KafkaUtils: overriding executor group.id to spark-executor-use_a_separate_group_id_for_each_stream
17/06/12 20:23:27 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135

person Ivan Alex    schedule 12.06.2017    source источник
comment
Spark 2.1.1, kafka 0.10.2.1, spark-kafka 2.1.1, kafka-clients 0.10.2.1, spark cassandra connector 2.0.1   -  person Ivan Alex    schedule 12.06.2017
comment
вы видите вкладку потоковой передачи в пользовательском интерфейсе искры?   -  person maasg    schedule 12.06.2017
comment
Хороший улов. Не могли бы вы принять свой ответ?   -  person maasg    schedule 15.06.2017


Ответы (1)


Извините за вопрос, я решил сам, поставил

ssc.sparkContext.setLogLevel("DEBUG")

Spark попытался разрешить имя хоста моей виртуальной машины, но в конфигурации адреса kafka я использовал IP, поэтому добавил адрес в / etc / hosts, и он работает!

person Ivan Alex    schedule 12.06.2017