Как настроить kafka так, чтобы у нас была возможность читать с самого раннего, самого последнего, а также с любого заданного смещения?

Я знаю о настройке kafka для чтения с самого раннего или последнего сообщения. Как мы можем включить дополнительную опцию, если мне нужно прочитать предыдущее смещение? Причина, по которой мне нужно сделать это, заключается в том, что более ранние сообщения, которые были прочитаны, должны быть обработаны снова из-за некоторой ошибки в логике обработки ранее.


person SHILPA AR    schedule 03.08.2017    source источник


Ответы (2)


В клиенте java kafka есть несколько методов для потребителя kafka, которые можно использовать для указания следующей позиции потребления.

public void seek(раздел TopicPartition, длинное смещение)

Переопределяет смещения выборки, которые потребитель будет использовать при следующем опросе (тайм-аут). Если этот API вызывается для одного и того же раздела более одного раза, при следующем опросе() будет использовано самое последнее смещение. Обратите внимание, что вы можете потерять данные, если этот API будет произвольно использоваться в середине потребления для сброса смещений выборки.

Этого достаточно, а также есть seekToBeginning и seekToEnd.

person GuangshengZuo    schedule 03.08.2017
comment
Если есть 3 раздела и последние смещения 12, 13 и 15, в случае, если мы хотим прочитать все сообщения с определенной временной метки, как нам это сделать? - person SHILPA AR; 05.08.2017
comment
Невозможно прочитать сообщение с меткой времени, есть просто смещение. вы можете прочитать все сообщение, а затем обработать сообщение, которое вы хотите, если сообщение содержит значение метки времени. - person GuangshengZuo; 05.08.2017
comment
Вы хотите сказать, прочитайте каждое сообщение и сравните его внутри моего скрипта с отметкой времени, которую я ищу? - person SHILPA AR; 08.08.2017
comment
да, Kafka не поддерживает эту функцию, вам нужно сделать это, написав код. - person GuangshengZuo; 08.08.2017
comment
В Kafka 0.11 вы можете получить смещения временных меток в клиенте Java. См. kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/. Вы также можете использовать административный скрипт bin/kafka-consumer-groups --reset-offsets для внешнего изменения смещений, хранящихся в разделе потребительских смещений Kafka. Больше нет необходимости использовать zookeeper для хранения смещения (начиная с версии 0.9). - person Hans Jespersen; 31.08.2017

Я пытаюсь ответить на похожий, но не совсем тот же вопрос, так что давайте посмотрим, может ли моя информация вам помочь.

Во-первых, Я работал над этим другим вопросом/ответом SO

Короче говоря, вы хотите зафиксировать свои смещения, и наиболее распространенным решением для этого является ZooKeeper. Поэтому, если ваш потребитель сталкивается с ошибкой или ему необходимо завершить работу, он может возобновить работу с того места, на котором остановился.

Сам я работаю с очень большим потоком большого объема, и мой потребитель (для теста) должен каждый раз начинать с самого хвоста. В документации указано, что я должен использовать KafkaConsumer seek, чтобы объявить мою отправную точку.

Я постараюсь обновить свои выводы здесь, как только они станут успешными и надежными. Наверняка это решаемая проблема.

person J Mac    schedule 31.08.2017
comment
Наиболее распространенное место для хранения смещений, начиная с версии 0.9, — это сама Kafka (в теме __consumer_offsets). Zookeeper использовался только для смещений в старом потребительском API. - person Hans Jespersen; 31.08.2017