Apache Kafka: состояние потребителя

Я прочитал документацию на веб-сайте Kafka, но после попытки реализовать полный минимальный пример (производитель -> кафка -> потребитель) мне не очень понятно, как нужно обрабатывать «состояние потребителя», смещение.

Некоторая информация

  1. Я использую HighLevel API (Java)
  2. Мой потребитель - это простой класс с Main, в основном тот же, что можно найти на странице быстрого запуска Kafka.
  3. Я использую Zookeeper
  4. Я использую одного брокера

Теперь в документации говорится, что потребитель HighLevel API сохраняет свое состояние с помощью zookeeper, поэтому я ожидал, что смещение и, следовательно, состояние потребителя будет поддерживаться между

  • Брокер Kafka перезапускается
  • Потребительские перезагрузки

Но, к сожалению, это не так: каждый раз, когда я перезапускаю брокера или потребителя, все сообщения доставляются повторно. Теперь, наверное, это глупые вопросы, но

  1. В случае перезапуска Kafka: я понял, что это зависит от потребителя, чтобы сохранить его состояние, поэтому, вероятно, когда брокер (повторно) запустится, повторно отправьте все (!) сообщения, и потребитель решит, что потреблять .. .это правильно? Если да, что произойдет, если у меня будет 10.0000.0000 сообщений?

  2. В случае перезапуска потребителя JVM: если состояние сохраняется в Zookeeper, почему сообщения доставляются повторно? Возможно ли, что новая JVM будет иметь другую «идентичность» потребителя? И как в этом случае привязать предыдущую личность?


person Andrea    schedule 07.02.2013    source источник


Ответы (3)


Да, потребитель отвечает за сохранение своего состояния, а Java высокого уровня Consumer сохраняет свое состояние в zookeeper.

Скорее всего, вы не указали свойство конфигурации groupId. В этой ситуации kafka генерирует случайные groupId.

Также возможно, что вы отключили свойство конфигурации autocommit.enable.

Полный справочник конфигурации Kafka можно найти на этой странице: http://kafka.apache.org/configuration.html под заголовком "Важные свойства конфигурации для потребителя высокого уровня".

person Wildfire    schedule 12.02.2013

чтобы ответить на исходный вопрос: использование groupId помогает избежать ситуации "повторного использования всех сообщений с самого начала"

если вы измените groupId, вы получите все сообщения с момента создания очереди (или с момента последней очистки данных на основе политики хранения журналов kafka)

не путайте это с флагом kafka-console-consumer "--from-begin" (который устанавливает параметр auto.offset.reset), который позволяет выбирать между вариантами 1 и 2 ниже:

1) потреблять новые сообщения с момента использования последнего сообщения (НЕ с начала времени, когда очередь kafka была изначально создана):

props.put ("auto.offset.reset", "наименьший");

2) потреблять новые сообщения с момента запуска JVM подписчика (в этом случае вы рискуете пропустить сообщения, помещенные в очередь, пока подписчик не работает и не слушает очередь):

props.put ("auto.offset.reset", "самый большой");


примечание: приведенное ниже только косвенно связано с исходным вопросом

для более сложного варианта использования - если вы пытаетесь программно установить смещение потребителя для воспроизведения сообщений, начиная с определенного времени, - потребуется использовать API SimpleConsumer, как показано в https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example, чтобы найти наименьшее смещение для воспроизведения от правого брокера / раздела. По сути, это заменяет zookeeper нашей собственной логикой FindLeader. очень сложно.

для этого варианта использования (произвольное воспроизведение сообщений, начиная с определенного времени, указанного пользователем) мы решили хранить локальный кеш сообщений и управлять смещениями локально вместо использования api управления смещением kafka (что потребовало бы повторной реализации хорошего фрагмента функциональности zookeeper с SimpleConsumer).

Т.е. относитесь к кафке как к "почтальону", когда сообщение доставлено, оно попадает в локальный почтовый ящик, и в случае, если нам нужно вернуться к определенному смещению в прошлом и, скажем, воспроизвести сообщения (которые уже были использованы), например в случае ошибки приложения потребителя мы не возвращаемся в «почтовое отделение» (брокеры kafka), чтобы выяснить правильный порядок доставки, а управляем им локально.

конец примечания

person alex    schedule 11.12.2013
comment
Не могли бы вы подробнее рассказать, как вы управляете зачетами локально, а не из Kafka? Например, как вы определяете и рассчитываете смещения для каждого отправляемого сообщения, которое затем будет использовано. - person David; 17.06.2015
comment
после использования - добавьте текущую метку времени как идентификатор сообщения и сохраните сообщение как двоичный blob (он отправляется в формате avro, и мы не десериализуем его на данный момент) в hsql (с сохранением на диск), или вы можете использовать apache phoenix и заархивировать его там в двоичном формате с двумя столбцами ID (отметка времени), Сообщение (VARBINARY) - person alex; 18.06.2015
comment
Но как это связано со смещением сообщения? Значение смещения Kafka не является меткой времени или двоичной кодировкой сообщения или хешем того и другого? Я все еще новичок в Кафке, так что извините за мое незнание. - person David; 19.06.2015
comment
это не так, нас не волнует смещение кафки. мы заменяем его нашим собственным смещением в виде локальной временной метки при получении сообщения, а затем используем его для индексации сообщений в локальном архиве db, который периодически очищается. если нам нужно воспроизвести последовательность сообщений, полученных в течение определенного (недавнего) диапазона времени, он выполняет свою работу. Мы читаем сообщения из базы данных и отправляем по назначению (в том же порядке, в котором они были получены изначально от кафки). - person alex; 19.06.2015
comment
чтобы уточнить - мои комментарии выше относятся к случаю использования специального воспроизведения сообщений, начиная с определенного пользователем времени, что отличается от исходного вопроса, связанного с работой kafka по умолчанию, я обновил свой ответ, чтобы отразить это. - person alex; 19.06.2015

Похоже, я плохо читал ... все дело в странице конфигурации. В частности, оба моих вопроса были решены установкой флага autooffset.reset, который по умолчанию имеет значение «наименьший» и, следовательно, вызывает описанные эффекты.

Теперь, когда в качестве значения указано «наибольшее», все работает, как ожидалось, как в случае перезапуска потребителя, так и в случае перезапуска брокера, потому что смещение всегда является наибольшим.

person Andrea    schedule 13.02.2013