Возможные причины повторного использования сообщений Kafka

Вчера я обнаружил из журнала, что кафка повторно принимал некоторые сообщения после того, как координатор группы Kafka инициировал ребалансировку группы. Эти сообщения были израсходованы два дня назад (подтверждено из журнала).

В журнале было зарегистрировано два других перебалансирования, но они больше не считали сообщения. Так почему же первое изменение бланков вызовет повторное потребление сообщений? Какие были проблемы?

Я использую клиент golang kafka. вот код

config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest 

и мы обрабатываем сообщения перед тем, как запросить сообщения, поэтому, похоже, мы используем стратегию «Отправить хотя бы один раз» для kafka. У нас есть три брокера на одной машине и только один потребительский поток (процедура перехода) на другой машине.

Какие объяснения этому феномену? Я думаю, что сообщения, должно быть, были зафиксированы, потому что они были потреблены два дня назад, или почему кафка будет хранить смещения более двух дней без фиксации?

Пример использования кода:

func (consumer *Consumer) ConsumeClaim(session 
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for message := range claim.Messages() {
    realHanlder(message)   // consumed data here
    session.MarkMessage(message, "") // mark offset
}

return nil
}

Добавлен:

  1. Ребалансировка произошла после перезапуска приложения. Было два других перезапуска, которые не были восстановлены.

  2. конфиги кафки

    log.retention.check.interval.ms = 300000
    log.retention.hours = 168
    zookeeper.connection.timeout.ms = 6000
    group.initial.rebalance.delay.ms = 0
    delete.topic.enable = true
    auto.create.topics.enable = false


person Wallace    schedule 02.07.2019    source источник
comment
Когда вы используете самое старое смещение, вы получите сообщения от самого старого, которое вы не зафиксировали. Не могли бы вы рассказать о потребляющей фазе вашего кода?   -  person Parham Alvani    schedule 02.07.2019
comment
Какова ваша политика хранения на сервере? И меняется ли ваша групповая идентификация при ребалансировке?   -  person Parham Alvani    schedule 02.07.2019


Ответы (1)


Прочитав исходный код как клиента golang saram, так и сервера kafka, наконец, я нашел причину, как показано ниже

  1. Время хранения смещения группы потребителей составляет 24 часа, что является настройкой по умолчанию kafka, а время хранения журнала явно составляет 7 дней. установлен нами.

  2. Мое серверное приложение работает в тестовой среде, которую могут посещать немногие люди, что означает, что может быть несколько сообщений, созданных производителем kafka, а затем группа потребителей имеет несколько сообщений для потребления, поэтому потребитель может не фиксировать какое-либо смещение в течение длительного времени.

  3. Когда смещение потребления не обновляется более 24 часов из-за конфигурации смещения, брокер / координатор kafka удаляет смещение потребления из разделов. В следующий раз, когда saram запрашивает у брокера kafka, где находится смещение, конечно, клиент ничего не получает. Обратите внимание, что мы используем sarama.OffsetOldest в качестве начального значения, тогда клиент sarama будет потреблять сообщения с начала сообщений, хранимых брокером kafka, что приводит к повторному потреблению сообщений, и это, вероятно, произойдет из-за сохранения журнала 7 дней

person Wallace    schedule 10.08.2019