Как заставить потребителя kafka читать с последнего использованного смещения, но не с начала

Я новичок в kafka и пытаюсь понять, есть ли способ читать сообщения с последнего использованного смещения, но не с начала.

Я пишу примерный случай, чтобы мое намерение не отклонялось.

Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM

Есть ли способ получить сообщения, созданные с последнего использованного смещения.?


person Srini    schedule 12.11.2015    source источник


Ответы (3)


Я новичок в kafka и пытаюсь понять, есть ли способ читать сообщения с последнего использованного смещения, но не с начала.

Да, можно использовать консольного потребителя для чтения с последнего использованного смещения. Вы должны добавить флаг consumer.config при вызове kafka-console-consumer.

Пример:-

[root@sandbox bin]# ./kafka-console-consumer.sh --topic test1 --zookeeper localhost:2181 --consumer.config /home/mrnakumar/consumer.properties

Здесь /home/mrnakumar/consumer.properties - это файл, содержащий group.id. Вот как выглядит /home/mrnakumar/consumer.properties: -

group.id = consoleGroup

Без использования consumer.config можно читать либо с начала [с помощью --from-begin], либо только с конца журнала. Конец журнала означает все сообщения, опубликованные после запуска потребителя.

person mrnakumar    schedule 14.11.2015
comment
Да, если мы указали какой-либо идентификатор группы, то данные будут считаны из последней использованной точки. Если мы работали без идентификатора группы, он учитывает только данные после запуска .. Спасибо .. - person Srini; 15.11.2015
comment
Можно ли также зафиксировать смещения через консольного потребителя для определенной группы потребителей? - person clausmc; 19.09.2016
comment
Это не работает, если auto.offset.reset=earliest не установлен в consumer.properties - person STaefi; 22.07.2019

Установка auto.offset.reset=earliest И фиксированного group.id=something в конфигурации потребителя запустит потребителя на последнем зафиксированном смещении. В вашем случае он должен начать потреблять при первом сообщении в 7:20. Если вы хотите, чтобы он начал читать сообщения, отправленные ПОСЛЕ запуска, auto.offset.reset=latest проигнорирует 10 сообщений, отправленных в 7:20, и прочитает все, которые приходят после его запуска.

Если вы хотите, чтобы он запускался с самого начала, вы должны либо вызвать seekToBeginning после первого consumer.poll(), либо изменить идентификатор группы потребителей на что-то уникальное.

person George Smith    schedule 30.11.2018
comment
Просто чтобы убедиться, что я хорошо понимаю. Когда вы говорите: «Если вы хотите, чтобы все началось с самого начала, измените идентификатор группы потребителей на что-то уникальное». Означает ли это, что, когда мы хотим прочитать последнюю версию, мы должны использовать идентификатор группы, который уже существует? - person homeOfTheWizard; 16.07.2020

Вы должны установить параметр auto.offset.reset в конфигурации вашего потребителя на largest, чтобы он читал все сообщения после последнего зафиксированного смещения.

person codejitsu    schedule 12.11.2015
comment
@Srini, вам не нужно устанавливать для этого свойства числовое значение, значение этого свойства должно быть «самым большим», чтобы использовать его с конца вашего потока. - person codejitsu; 13.11.2015
comment
работал у меня. Спасибо! Я только что добавил в файл consumer.properties следующую строку: auto.offset.reset = large - person Ofer Eliassaf; 30.03.2016
comment
может кто-нибудь объяснить, почему самый большой? Самый ранний работает нормально, если потребитель не включен, когда эти 10 сообщений отправлены и когда потребитель включен, с auto.offset.reset = самое раннее его использование всех 10 сообщений, которые не были использованы - person driven_spider; 20.12.2019