Есть ли в Kafka 0.9 способ перечислить смещение для всех потребителей в группе потребителей?

Я использую новый Consumer API Kafka 0.9.

Я позволяю Kafka позаботиться о компенсации для потребителей. У меня есть потребители, работающие на нескольких машинах и читающие из одной и той же темы.

Я пытаюсь выяснить следующее:

  • Потребители, зарегистрированные в группе потребителей
  • Зачет каждого потребителя

Я думал, что отношения между потребителями и потребителями будут храниться в ZooKeeper. Я вижу узел потребителей в ZooKeeper, у него нет детей.

Смещения, насколько я могу судить, просматривая код, записываются в kafka, но я не могу сказать, в какую тему они пишутся?


person hba    schedule 30.01.2016    source источник


Ответы (3)


Смещение, если оно обрабатывается kafka, не хранится в zookeeper, оно сохраняется в вызове темы «__consumer_offsets- #» в папке kafka-logs.

Вы можете узнать смещение каждого потребителя, проверив поле смещения в KafkaRecords при выполнении poll (), если вам нужна дополнительная информация о проверке группы потребителей bin / kafka-consumer-groups.sh

Надеюсь, это поможет!

person Nautilus    schedule 31.01.2016
comment
Да, я могу читать ключи и значения из темы __consumer_offsets, однако я не уверен, какими они должны быть, как мне десериализовать ключ и значение? - person hba; 01.02.2016
comment
В этом нет необходимости, вы можете использовать bin / kafka-consumer-groups.sh, чтобы проверить эту информацию. - person Nautilus; 01.02.2016
comment
Я вижу, группа kafka-consumer-group считывает смещения из zookeeper. Не __consumer_offsets. Я очень запуталась ... - person hba; 03.02.2016
comment
Если вы используете нового потребителя, kafka должно быть значением по умолчанию для свойства offsets.storage, но на всякий случай добавьте это свойство offsets.storage = kafka - person Nautilus; 03.02.2016
comment
@hba вы должны передать флаг --new-consumer для чтения смещений из __consumer_offsets - person Sam; 27.07.2016

Кажется, что в теме __consumer_offsets, на которую указывает @nautilus, хранится как минимум 2 типа пар ключ-значение.

  1. Информация о групповых метаданных
  2. Смещение коммитов

Насколько я могу судить, Kafka использует свою собственную схему и сериализацию. Вы можете узнать больше об этих структурах, просмотрев kafka.coordinator.GroupMetadataManager:

  • GroupMetadataManager.OFFSET_COMMIT_KEY_SCHEMA
  • GroupMetadataManager.OFFSET_COMMIT_VALUE_SCHEMA_V0
  • GroupMetadataManager.GROUP_METADATA_KEY_SCHEMA
  • GroupMetadataManager.GROUP_METADATA_VALUE_SCHEMA_V0
person hba    schedule 02.02.2016

Как упоминает @hba, подробности кодирования / декодирования находятся в _ 1_ внизу. Найдите readMessageKey и два следующих метода. По сути, вам нужна последовательность вызовов вроде

import org.apache.kafka.common.protocol.types.Type;
...
ByteBuffer bb = ByteBuffer.wrap(consumerRecord.key())
short version = bb.getShort();
String group = (String)Type.String.read(bb);
String topic = (String)Type.String.read(bb);
int partition = (int)Type.INT32.read(bb);

Приятно то, что org.apache.kafka.common.protocol.types.Type является частью Java api, независимой от большого основного Jar. Уродливая часть заключается в том, что приведенный выше фрагмент кода не является полным. Существуют две версии каждого consumerRecord.key() и consumerRecord.value(), и одна из них должна имитировать декодирование вышеупомянутых методов. Ничего страшного, просто немного утомительно.

Если ваш проект может зависеть от scala-jar, полного kafka-jar и еще одного или двух jar-файлов, необходимых для Kafka, вы также можете использовать GroupMetadataManager.readMessageKey(bb) и два других метода для чтения ключа и значения. По крайней мере, в 0.9.0.1 его паблик.

person Harald    schedule 06.03.2016
comment
Не могли бы вы прояснить, что вы имеете в виду под двумя версиями каждого файла consumerRecord? Также не следует указывать здесь размер байтового буфера или, по крайней мере, как предотвратить следующую ошибку: Bufferunderflowexception: at Buffer.nextGetIndex () - person vbNewbie; 21.09.2018