Как мне узнать, была ли моя запись зафиксирована вручную с помощью Spring Kafka

Я хотел бы знать, как работает фиксация, когда AckMode установлен на MANUAL в spring kafka.

Ниже показано свойство, которое я установил в KafkaConfig containerProperties.setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL);

Код listener

@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
        countDownLatch.countDown();     
        acknowledgment.acknowledge();
}

Я делаю acknowledgement в соответствии с документацией spring kafka, но это будет означать только то, что мое сообщение помечено как отправлено, но не использовано (это я так понимаю).

  1. В этом случае следует вызвать метод commitSync(). Если да, то откуда мне это вызывать, поскольку мне нужно получить ссылку на KafkaConsumer. Если нет, как это работает внутри, могу ли я отследить это?

  2. Возвращается commitId или какое-то другое значение? Моя идея состоит в том, чтобы узнать, используется ли конкретная потребительская запись или нет. Я хочу сохранить значение для внутреннего отслеживания.

  3. Поддерживает ли kafka внутренне какое-либо состояние в записи потребителя, например (Подтверждено, Выполнено, Не выполнено), которое может быть полезно для классификации.

Это действительно помогло бы мне различать, сколько записей потреблено, а сколько ожидает обработки и их состояния.


person srikant_mantha    schedule 23.02.2017    source источник


Ответы (1)


Могу ответить на первый вопрос. Все остальное выглядит как история непосредственно для Apache Kafka.

Поскольку мы не можем выполнять commit откуда хотели, а только из того же потока, который выполняет consumer.poll(), мы сохраняем все запросы на фиксацию во внутренней очереди KafkaMessageListenerContainer и проверяем это в основном цикле потребителя перед выполнением this.consumer.poll().

Даже если вы используете MANUAL_IMMEDIATE, реальный consumer.commitSync() выполняется в другом потоке, отличном от вашего acknowledgment.acknowledge().

OTOH передает API там в Consumer выглядит так:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

Итак, нет commitId крючков, которые нужно было бы ловить.

Я думаю, что в Apache Kafka нет такого понятия, как Not Committed или что-то еще. Данные всегда присутствуют в журнале тем и не удаляются оттуда до тех пор, пока не будет выполнена конкретная операция администратора или конфигурация сжатия.

Я думаю, что функция commit offset полностью связана с целью consumer group и, согласно JavaDocs, у нас есть:

* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1.

Итак, когда ваш потребитель умер, он перезапустится с последнего зафиксированного смещения для своей группы. Другая группа может читать те же данные, но с другого смещения. Я думаю, что именно поэтому их API не обеспечивает привязки к фактическому состоянию. Такого просто нет!

person Artem Bilan    schedule 23.02.2017
comment
Спасибо Артему за развернутый ответ. Итак, из приведенного выше ответа я понял, что consumer.commitSync() происходит в другом потоке, когда мы вызываем acknowledgment.acknowledge(). Следовательно, это гарантирует, что фиксация происходит. Такое же поведение и для commitAsync ()? Я надеюсь, что то же самое и с AckMode.MANUAL, поскольку я не использую MANUAL_IMMEDIATE - person srikant_mantha; 27.02.2017
comment
Да, это правда. Любые Consumer операции выполняются в одном потоке. MANUAL_IMMEDIATE отличается от MANUAL только немедленным consumer.wakeup() прерыванием текущего опроса. - person Artem Bilan; 04.05.2017