Получить последнее сообщение из сценария консоли потребителя kafka

Мы можем получить каждое сообщение от Kafka, выполнив:

 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Есть ли способ получить только последнее сообщение?

РЕДАКТИРОВАТЬ:

Если вы просто хотите отслеживать некоторые сообщения (--max-messages 10) в своем потоке, удобная команда:

watch -n5 "./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic auction --max-messages 10"


person Paul Leclercq    schedule 16.10.2015    source источник
comment
Как насчет bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning --timeout-ms 10 | tail -n 1 (таймаут-мс убивает потребителя после того, как он все прочитал)   -  person natetitterton    schedule 13.07.2018
comment
увеличьте значение тайм-аута, если есть огромные данные   -  person Pardeep Sharma    schedule 20.11.2020
comment
@natetitterton, ваш ответ - получить все предметы не по теме, показать первый. не очень эффективный   -  person Mickey Perlstein    schedule 25.07.2021


Ответы (3)


Я не знаю об автоматизме, но, используя этот простой двухэтапный подход, он должен работать. Обратите внимание, что в моем случае это была разделенная тема, вы можете оставить для нее параметры, если у вас есть неразделенная тема:

1) Получите максимальное смещение для вашей темы (+ их разделы):

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic

mytopic:2:11
mytopic:1:7
mytopic:0:15
mytopic:3:8

2) Выберите тему (+ раздел) и укажите параметр offset - n в качестве параметра:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --offset 10 --partition 0  

Последние n сообщений темы будут выведены на консоль. В моем примере будет показано 5 сообщений (= 15-10).

person Aydin K.    schedule 26.04.2018
comment
Следует отметить, что этот метод в целом ненадежен, потому что не каждое смещение обязательно содержит сообщение. Могут быть пробелы, например, в случае сжатых журналов или прерванных транзакций. - person Brent Kerby; 15.07.2018
comment
Мы также можем использовать клиент для установки на ваш компьютер, чтобы получить эту информацию и многое другое: conduktor.io (спасибо Стефану Маареку и Стефану Дерозио) - person Paul Leclercq; 24.06.2019
comment
Почему мы не указываем здесь идентификатор потребителя? Зачеты AFAIK должны быть специфичными для группы потребителей, не так ли? - person Boris Mitioglov; 13.09.2019
comment
@BorisMitioglov: Смещения зависят от раздела. - person Aydin K.; 14.11.2019

С помощью KafkaCat (https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html) можно прочитать N последних сообщений в теме Apache Kafka.

person user16451796    schedule 15.07.2021

Я получил, вероятно, правильный ответ от какого-то поискового запроса http://grokbase.com/t/kafka/users/145x930s27/how-to-get-last-message

Там кто-то предлагает найти последнее смещение с помощью getOffsetBefore api, а затем использовать это смещение - 1 для выборки.

person Mihail Krivushin    schedule 26.06.2017
comment
Ссылка недействительна - person michal2616; 22.05.2020