Acknowledgement.acknowledge () выбрасывает исключение в spring-kafka @KafkaListener

Когда я устанавливаю для enable.auto.commit значение false и пытаюсь вручную зафиксировать смещение с использованием аннотации spring-kafka @KafkaListener, я получаю org.springframework.kafka.listener.ListenerExecutionFailedException: метод прослушивателя не может быть вызван с входящим сообщение

У меня очень простой код:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
    System.out.println("Received Messasge in group 'foo': " + message);

    // TODO: Do something with the message
}

И когда я отправляю сообщение от производителя, я получаю следующее исключение:

org.springframework.kafka.listener.ListenerExecutionFailedException: метод прослушивателя не может быть вызван с входящим сообщением.

Детали обработчика конечной точки:

Метод [public void com. ****. *****. *******. KafkaMessageListener.listenFooGroup (java.lang.String, org.springframework.kafka.support.Acknowledgment)]

Bean [com.****.*****.*******.KafkaMessageListener@5856dbe4]; вложенное исключение - org.springframework.messaging.converter.MessageConversionException: не удается обработать сообщение; вложенное исключение - org.springframework.messaging.converter.MessageConversionException: не удается преобразовать из [java.lang.String] в [org.springframework.kafka.support.Acknowledgment] для GenericMessage [payload = test, headers = {kafka_offessageKreceived = null, kafka_receivedPartitionId = 0, kafka_receivedTopic = demotopic}], failedMessage = GenericMessage [payload = test, headers = {kafka_offset = 57, kafka_receivedMessageKey = null, kafka_receivedPartitionId] deceivedPartitionId = 0, kafka_receivedPartitionId}

Пожалуйста помоги. TIA.


person Aravind Kv    schedule 27.06.2017    source источник


Ответы (1)


Вы должны установить containerProperties ackMode фабрики контейнеров на MANUAL или MANUAL_IMMEDIATE, чтобы получить Acknowledgment объект.

В других режимах подтверждения за фиксацию смещения отвечает контейнер.

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)

Или установите свойство ....ackMode при использовании Spring Boot

person Gary Russell    schedule 27.06.2017
comment
2.0 будет генерировать менее неясное исключение - спасибо, что указали на это. - person Gary Russell; 28.06.2017
comment
Теперь это не вызывает исключения. Большое спасибо. Ценю вашу помощь. - person Aravind Kv; 28.06.2017
comment
Следующий выпуск вызовет более значимое исключение. new IllegalStateException("No Acknowledgment availailable as an argument, the listener container must have a MANUAL Ackmode to populate the Acknowledgment.",. Спасибо за указание на это. - person Gary Russell; 28.06.2017
comment
Может кто-нибудь сказать мне, где установить вышеупомянутое свойство ackMode для Spring Boot. - person abb; 01.08.2020
comment
Не задавайте новых вопросов в комментариях к очень старым ответам. spring.kafka.listener.ack-mode - docs.spring.io/spring-boot/docs/2.3.2.RELEASE/reference/html/ - person Gary Russell; 01.08.2020
comment
@abb factory.getContainerProperties (). setAckMode (ContainerProperties.AckMode.MANUAL_IMMEDIATE); - person abbas; 02.06.2021