Я прочитал документацию на веб-сайте Kafka, но после попытки реализовать полный минимальный пример (производитель -> кафка -> потребитель) мне не очень понятно, как нужно обрабатывать «состояние потребителя», смещение.
Некоторая информация
- Я использую HighLevel API (Java)
- Мой потребитель - это простой класс с Main, в основном тот же, что можно найти на странице быстрого запуска Kafka.
- Я использую Zookeeper
- Я использую одного брокера
Теперь в документации говорится, что потребитель HighLevel API сохраняет свое состояние с помощью zookeeper, поэтому я ожидал, что смещение и, следовательно, состояние потребителя будет поддерживаться между
- Брокер Kafka перезапускается
- Потребительские перезагрузки
Но, к сожалению, это не так: каждый раз, когда я перезапускаю брокера или потребителя, все сообщения доставляются повторно. Теперь, наверное, это глупые вопросы, но
В случае перезапуска Kafka: я понял, что это зависит от потребителя, чтобы сохранить его состояние, поэтому, вероятно, когда брокер (повторно) запустится, повторно отправьте все (!) сообщения, и потребитель решит, что потреблять .. .это правильно? Если да, что произойдет, если у меня будет 10.0000.0000 сообщений?
В случае перезапуска потребителя JVM: если состояние сохраняется в Zookeeper, почему сообщения доставляются повторно? Возможно ли, что новая JVM будет иметь другую «идентичность» потребителя? И как в этом случае привязать предыдущую личность?