Может ли кто-нибудь помочь мне с приведенными ниже запросами. Я использую kafka-clients-0.10.1.1(Single Node Single Broker)
Значение по умолчанию auto.create.topics.enable — true.
1. Я отправляю сообщение в тему, используя
kafkaProdcuer<String,String> producer> producer...
producer.send(new ProducerRecord<String, String>("my- topic","message"));
producer.close();
Для потребления:
kafkaConsumer<String,String> consumer....
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(200);
while(true){
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
Проблема в том, что когда я запускаю потребителя в первый раз, он не получает значений. И я должен запустить производителя и снова запустить потребителя, чтобы получить значения. Иногда мне приходится запускать производителя 3 раза. Почему это работает таким образом?
2.) enable.auto.commit=false
Может ли один и тот же потребитель прочитать сообщение несколько раз, если свойство enable.auto.commit имеет значение false?
3.) Учитывая мой потребительский код в 1-й точке. Как я могу разорвать цикл, я имею в виду. Как потребитель может узнать, что он прочитал все сообщения, а затем вызвать Consumer.close ()