Вчера я обнаружил из журнала, что кафка повторно принимал некоторые сообщения после того, как координатор группы Kafka инициировал ребалансировку группы. Эти сообщения были израсходованы два дня назад (подтверждено из журнала).
В журнале было зарегистрировано два других перебалансирования, но они больше не считали сообщения. Так почему же первое изменение бланков вызовет повторное потребление сообщений? Какие были проблемы?
Я использую клиент golang kafka. вот код
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest
и мы обрабатываем сообщения перед тем, как запросить сообщения, поэтому, похоже, мы используем стратегию «Отправить хотя бы один раз» для kafka. У нас есть три брокера на одной машине и только один потребительский поток (процедура перехода) на другой машине.
Какие объяснения этому феномену? Я думаю, что сообщения, должно быть, были зафиксированы, потому что они были потреблены два дня назад, или почему кафка будет хранить смещения более двух дней без фиксации?
Пример использования кода:
func (consumer *Consumer) ConsumeClaim(session
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
realHanlder(message) // consumed data here
session.MarkMessage(message, "") // mark offset
}
return nil
}
Добавлен:
Ребалансировка произошла после перезапуска приложения. Было два других перезапуска, которые не были восстановлены.
конфиги кафки
log.retention.check.interval.ms = 300000
log.retention.hours = 168
zookeeper.connection.timeout.ms = 6000
group.initial.rebalance.delay.ms = 0
delete.topic.enable = true
auto.create.topics.enable = false