приложение kafka streams - игнорировать старые сообщения при перезапуске

Я имею дело с данными таймсерий для живого приложения. Так что старые данные не имеют значения. Я просто хочу обрабатывать данные, полученные после запуска потокового приложения, а не из ранее зафиксированного смещения. Как правильно игнорировать старые записи в приложении потока kafka после перезапуска?

В потребительском API kafka я обычно использовал метод seekToEnd() для перехода к последней записи. Есть ли эквивалентный механизм для потоков? Я хочу избежать фильтрации всех сообщений с момента последней фиксации, чтобы игнорировать старые сообщения.


person Stanley    schedule 15.11.2017    source источник
comment
Привет, Стэнли, ты нашел способ обойти это?   -  person Amanpreet Khurana    schedule 06.07.2018


Ответы (1)


Вы можете создать другого потребителя, используя Kafka Consumer API с groupId таким же, как applicationId для kafka-streams, и использовать этого потребителя для выполнения seekToEnd() перед запуском вашего потока. Отключите autoCommit для этого специального потребителя и зафиксируйте смещение вручную после seekToEnd(). Затем попробуйте запустить трансляцию.

Убедитесь, что поток не начался, пока не будут зафиксированы смещения от потребителя сброса.

person chanK    schedule 05.12.2017