невозможно установить max.poll.records для потребителя kafka, где cons.poll по-прежнему возвращает все записи в разделе

Я создал приложение многопоточного потребителя для работы с различными разделами. Изучая различные блоги, я узнал о свойстве max.poll.records, чтобы получить контроль над набором записей из данной темы, раздела (чтобы он мог быстро выйти из цикла Records и, следовательно, вызвать cons.poll () чтобы остаться в живых)

Проблема в том, что моя логика обработки требует времени для обработки каждой записи. после запуска Cons-2 оба начинают работать с одним и тем же разделом, так как Cons-1 все еще не пошел на ребалансировку (т.е. cons.poll () еще не произошло).

При увеличении числа потребителей, чтобы они могли перебалансировать себя, cons.poll () не будет выполняться, пока не будут обработаны все записи.

Я не могу использовать session.timeout.ms, поскольку новый потребитель может также начать работать с тем же разделом, что и Cons-1.

Я пробовал установить свойство, используя:

props.put("max.poll.records",1);
props.put("max.poll.records","1");

но ни то, ни другое не изменили нет. записей из опроса.

Я использую Apache Kafka 9 и API ниже.

<dependency>
    <groupId>org.apache.servicemix.bundles</groupId>
    <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
    <version>0.9.0.1_1</version>
</dependency>

person usman    schedule 24.05.2016    source источник


Ответы (1)


max.poll.records свойство выпущено в Kafka-0.10.0. Он недоступен в версии Kafka 0.9.0.1. См. Задачу KAFKA-3007 в примечаниях к версии.

Если ваша обработка записей заняла много времени, может оказаться полезной ссылка ниже.

AdvancedConsumer.java

person Kamal Chandraprakash    schedule 25.05.2016
comment
здесь проблема в том, что у вас есть однопоточный потребительский поток, который будет обрабатывать каждую запись. где я использую несколько записей. - person usman; 26.05.2016
comment
Я не понял твоей точки зрения. По сути, вам нужно запустить несколько потребителей, принадлежащих к одной группе. При этом одновременно обрабатываются записи из нескольких разделов. - person Kamal Chandraprakash; 26.05.2016
comment
собственно, это действительно многопользовательская программа. Мне также нужно потреблять многопоточные записи. не тратить время на обработку единичных записей. и, следовательно, смещение фиксации на основе обрабатываемых записей. Тем не менее, в вашем коде есть довольно новые полезные вещи, и вы их изучили. - person usman; 26.05.2016