kafka Client API вопросы

Может ли кто-нибудь помочь мне с приведенными ниже запросами. Я использую kafka-clients-0.10.1.1(Single Node Single Broker)

Значение по умолчанию auto.create.topics.enable — true.

1. Я отправляю сообщение в тему, используя

    kafkaProdcuer<String,String> producer> producer...
    producer.send(new ProducerRecord<String, String>("my- topic","message"));
    producer.close();

Для потребления:

    kafkaConsumer<String,String> consumer....
    consumer.subscribe(Arrays.asList("my-topic"));
    ConsumerRecords<String, String> records = consumer.poll(200);

    while(true){
     for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
         }
     }

Проблема в том, что когда я запускаю потребителя в первый раз, он не получает значений. И я должен запустить производителя и снова запустить потребителя, чтобы получить значения. Иногда мне приходится запускать производителя 3 раза. Почему это работает таким образом?

2.) enable.auto.commit=false

Может ли один и тот же потребитель прочитать сообщение несколько раз, если свойство enable.auto.commit имеет значение false?

3.) Учитывая мой потребительский код в 1-й точке. Как я могу разорвать цикл, я имею в виду. Как потребитель может узнать, что он прочитал все сообщения, а затем вызвать Consumer.close ()


person jena84    schedule 29.12.2016    source источник
comment
В корзине kafka есть консольный потребитель, вы можете попробовать его, пока ваш собственный потребитель не может потреблять данные. И попробуйте добавить производителя.flush(), если это возможно. Для вашего вопроса 3 программа потоковой передачи не может узнать конец пакета, но вы можете установить поток тайм-аута для отслеживания тайм-аута без использования данных.   -  person Lhfcws    schedule 29.12.2016
comment
Да, я тестировал его с потребителем bin, он выдает ошибку на время. Ошибка при извлечении метаданных с идентификатором корреляции 1: {my-topic-106 = LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)   -  person jena84    schedule 29.12.2016
comment
Вы производили данные в последнее время, прежде чем потреблять данные? По умолчанию Kafka хранит ваши данные только в течение 3 дней.   -  person Lhfcws    schedule 30.12.2016


Ответы (1)


1) Вы всегда используете один и тот же group.id в потребителе? Вы производите, прежде чем потреблять? Это может быть связано с группами потребителей и управлением компенсацией. См. этот ответ о смещении потребителей.

2) Не уверен, что вы имеете в виду чтение дубликатов намеренно или случайно. Вы всегда можете снова прочитать то же сообщение, перейдя на эту позицию, если сообщение не было удалено из-за политики хранения темы. Если вы случайно имеете в виду, что для автоматической фиксации установлено значение false, это просто означает, что потребитель не будет фиксировать смещения для вас, вы должны сделать это вручную, вызвав commitSync() или commitAsync(). В любом случае все еще есть вероятность того, что ваш потребитель обработает сообщение и выйдет из строя перед фиксацией, в этом случае, когда потребитель восстановится, он снова прочитает эти обработанные, но не зафиксированные сообщения. Если вы хотите точно один раз семантически, вам нужно сделать что-то еще, например атомарно сохранять смещения с обработанными сообщениями.

3) Как уже упоминалось Lhfcws, в потоке нет такого понятия, как "все сообщения". Вот некоторые вещи (трюки), которые вы можете сделать:

  • Вы можете проверить, возвращен ли список записей опросом, если он пуст, и после определенного заданного количества раз прервать цикл и выйти.
  • Если сообщения упорядочены (вы читаете из одного раздела), вы можете отправить своего рода специальное сообщение END_OF_DATA, когда вы его увидите, вы закроете потребителя.
  • Вы можете заставить потребителя прочитать несколько сообщений, а затем выйти, в следующий раз он продолжит с последнего зафиксированного смещения.
person Luciano Afranllie    schedule 29.12.2016
comment
Спасибо Lhfcws и Luciano. Теперь я понял 2-й и 3-й пункт. Что касается 1-го пункта, я запускаю потребителя сразу после производителя. Я не меняю группу потребителей. Я не создаю тему с помощью утилиты bin. Я предполагаю, что code product.send создаст тему. bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true - person jena84; 30.12.2016
comment
jena84, попробуйте установить auto.offset.reset на самое раннее значение в конфигурации потребителя и повторите попытку. Кроме того, после запуска потребителя дождитесь завершения ребалансировки. - person Luciano Afranllie; 30.12.2016
comment
Потрясающий!!! Это сработало. Большое спасибо. Это из-за причины, указанной в ссылке, которую вы дали об управлении смещением. Я также пытался поставить наименьший. Это не позволило мне. Это из-за нового потребительского API? - person jena84; 30.12.2016
comment
Рад, что это сработало. Да, новый потребительский API использует самый ранний, в то время как старый потребитель использует наименьший. Если вы используете клиенты версии 0.10, позаботьтесь об использовании новых конфигураций потребителей (kafka.apache.org/documentation/ #newconsumerconfigs) - person Luciano Afranllie; 30.12.2016