o.apache.kafka.clients.NetworkClient - брокер начальной загрузки ‹hostname›: 9092 отключено

Я пытаюсь получать сообщения из темы Kafka, используя Spring Kafka consumer, но вижу ошибку ниже. Это отлично работает, когда я использую сообщения из темы kafka, настроенной на моем локальном компьютере -

[org.springframework.kafka.KafkaListenerEndpointContainer # 0-0-C-1] ПРЕДУПРЕЖДЕНИЕ o.apache.kafka.clients.NetworkClient - брокер начальной загрузки <hostname>:9092disconnected

Я могу читать сообщения с помощью командной строки

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ctp_verbose_amcs --from-beginning --zookeeper localhost:2181

Код

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    @Value(value = "${kafka.bootstrapAddress:localhost:9092}")
    private String bootstrapAddress;

    @Value(value = "${groupId:amcs-tas}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, Map<String, Object>> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ConciseMessageDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Сообщение об ошибке в журнале сервера

[2017-09-20 14:33:44,448] ERROR Closing socket for <hostname>:9092-10.251.127.31:51014 because of error (kafka.network.Processor)
kafka.network.InvalidRequestException: Error getting request for apiKey: 3 and apiVersion: 2
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:87)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2
        at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
        at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)
        at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)
        at org.apache.kafka.common.requests.MetadataRequest.parse(MetadataRequest.java:96)
        at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:48)
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92)
        ... 10 more

person Punter Vicky    schedule 20.09.2017    source источник
comment
Вы действительно видите <hostname>? Или вы запутали фактическое имя хоста? В первом случае ваша собственность имеет плохую стоимость; в последнем случае это, вероятно, проблема с сетью.   -  person Gary Russell    schedule 20.09.2017
comment
Да, я вижу настоящее имя хоста. Я удалил его при создании этого поста.   -  person Punter Vicky    schedule 20.09.2017
comment
Проверьте журналы сервера и просмотрите трассировку сети.   -  person Gary Russell    schedule 20.09.2017
comment
Попробуйте консоль потребителя с настоящим брокером.   -  person Gary Russell    schedule 20.09.2017
comment
Извини, Гэри, что подразумевается под настоящим брокером? Я также добавил ошибку, которую вижу в журнале сервера.   -  person Punter Vicky    schedule 20.09.2017


Ответы (1)


Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2

Ваша версия клиента несовместима с версией брокера.

Матрицу совместимости см. на странице проекта внизу.

person Gary Russell    schedule 20.09.2017
comment
Спасибо Гэри, сейчас проверю - person Punter Vicky; 20.09.2017
comment
Я использую эту версию кафки - kafka_2.11-0.10.0.0. Я вижу, что на странице был предоставлен список совместимых клиентов и библиотек spring-kafka. Я использовал нижеприведенное, я больше не вижу ошибки, но сообщения, похоже, не потребляются из темы. Должен ли я использовать другую версию клиента для версии kafka, которую я предоставил? группа компиляции: 'org.apache.kafka', имя: 'kafka-clients', версия: '0.10.2.0' группа компиляции: 'org.springframework.kafka', имя: 'spring-kafka', версия: '1.2. 2. ВЫПУСК - person Punter Vicky; 20.09.2017
comment
Я не знаю, совместим ли клиент 10.2 с брокером 10.0. Похоже, это не так. Проверьте документацию Kafka. - person Gary Russell; 20.09.2017
comment
Привет, Гэри, извиняюсь за обновление нескольких ошибок в одном и том же потоке. При необходимости я могу создать новый вопрос. Я включил отладку и вижу эту ошибку Ошибка поиска координатора группы для группы amcs-tas: координатор группы недоступен. - person Punter Vicky; 20.09.2017
comment
Предлагаю вам задать для этого новый вопрос. - person Gary Russell; 21.09.2017