org.apache.spark.SparkException: не удалось найти смещения лидера для Set ([test-topic, 0])

Я пытаюсь использовать платформу Confluent и делаю высокоуровневые запросы Kafka к конечной точке REST, используя это code в качестве примера.

Я использую следующие параметры Kafka:

val kafkaParams = Map(
  "bootstrap.servers" -> "localhost:9092",
  "schema.registry.url" -> "http://localhost:8081",
  "group.id" -> "EventConsumer",
  "auto.offset.reset" -> "smallest"
)

Это ошибка, которую я получаю, когда пытаюсь запустить код. Ошибка возникает в строке:

@transient val kafkaStream: DStream[(String, Object)] =
  KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](
    ssc, kafkaParams, Set(topic)
  )

Исключение в потоке "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException org.apache.spark.SparkException: не удалось найти смещения лидера для Set ([test-topic, 0]) в org.apache .spark.streaming.kafka.KafkaCluster $$ anonfun $ checkErrors $ 1.apply (KafkaCluster.scala: 366) в org.apache.spark.streaming.kafka.KafkaCluster $$ anonfun $ checkErrors $ 1.apply (KafkaCluster.scala: 366) в scala.util.Either.fold (Either.scala: 98) в org.apache.spark.streaming.kafka.KafkaCluster $ .checkErrors (KafkaCluster.scala: 365) в org.apache.spark.streaming.kafka.KafkaUtils $ .getFromOffsets (KafkaUtils.scala: 222) в org.apache.spark.streaming.kafka.KafkaUtils $ .createDirectStream (KafkaUtils.scala: 484) в kafka.EventsConsumer $ .delayedEndpoint $ kafkamer $ EventsConsumer) $ 1 (EventsConsumer) $ 1 (EventsConsumer). в kafka.EventsConsumer $ delayedInit $ body.apply (EventsConsumer.scala: 22) в scala.Function0 $ class.apply $ mcV $ sp (Function0.scala: 34) в scala.runtime.AbstractFunctio n0.apply $ mcV $ sp (AbstractFunction0.scala: 12) в scala.App $$ anonfun $ main $ 1.apply (App.scala: 76) в scala.App $$ anonfun $ main $ 1.apply (App.scala: 76) в scala.collection.immutable.List.foreach (List.scala: 381) в scala.collection.generic.TraversableForwarder $ class.foreach (TraversableForwarder.scala: 35) в scala.App $ class.main (App.scala : 76) в kafka.EventsConsumer $ .main (EventsConsumer.scala: 22) в kafka.EventsConsumer.main (EventsConsumer.scala) в sun.reflect.NativeMethodAccessorImpl.invoke0 (Собственный метод) в sun.reflect.NativeImplative.invoke0 (собственный метод) в sun.reflect.voccessorNativeMethoplative. .java: 62) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в com.intellij.rt.execution.application.AppMain. main (AppMain.java:147)

ОБНОВИТЬ:

Я попытался изменить localhost на IP, но проблема все равно осталась.


person Lobsterrrr    schedule 05.09.2016    source источник
comment
какое-нибудь решение для этого?   -  person navins    schedule 24.11.2016


Ответы (1)


Похоже, лидера нет в разделе темы. Попробуйте описать тему и проверьте, есть ли лидер для раздела 0 тестовой темы. Это происходит, если все реплики раздела не работают. Если у вас коэффициент репликации 1, то это наиболее вероятная причина.

person mrnakumar    schedule 06.09.2016
comment
Я пытаюсь получить содержимое темы Kafka с помощью curl -i -X GET "Accept: application/vnd.kafka.avro.v1+json" http://localhost:8082/topics/test-topic и получил ответ {"name":"test-topic","configs":{},"partitions":[{"partition":0,"leader":0,"replicas":[{"broker":0,"leader":true,"in_sync":true}]}]}. Итак, предполагаю, что с перегородкой проблем нет. Как узнать подробности? - person Lobsterrrr; 06.09.2016