Я создал приложение многопоточного потребителя для работы с различными разделами. Изучая различные блоги, я узнал о свойстве max.poll.records, чтобы получить контроль над набором записей из данной темы, раздела (чтобы он мог быстро выйти из цикла Records и, следовательно, вызвать cons.poll () чтобы остаться в живых)
Проблема в том, что моя логика обработки требует времени для обработки каждой записи. после запуска Cons-2 оба начинают работать с одним и тем же разделом, так как Cons-1 все еще не пошел на ребалансировку (т.е. cons.poll () еще не произошло).
При увеличении числа потребителей, чтобы они могли перебалансировать себя, cons.poll () не будет выполняться, пока не будут обработаны все записи.
Я не могу использовать session.timeout.ms, поскольку новый потребитель может также начать работать с тем же разделом, что и Cons-1.
Я пробовал установить свойство, используя:
props.put("max.poll.records",1);
props.put("max.poll.records","1");
но ни то, ни другое не изменили нет. записей из опроса.
Я использую Apache Kafka 9 и API ниже.
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>0.9.0.1_1</version>
</dependency>