Как использовать метод Spring Kafka Acknowledgement.acknowledge () для фиксации вручную

Я впервые использую Spring Kafka, и я не могу использовать метод Acknowledgement.acknowledge () для ручной фиксации в моем пользовательском коде, как указано здесь https://docs.spring.io/spring-kafka/reference/html/_reference.html#committing-offsets. У меня приложение с пружинной загрузкой. Если я не использую ручную фиксацию, мой код работает нормально. Но когда я использую Acknowledgement.acknowledge () для фиксации вручную, он показывает ошибку, связанную с bean-компонентом. Также, если я не использую ручную фиксацию должным образом, предложите мне правильный способ сделать это.

Сообщение об ошибке:

***************************
APPLICATION FAILED TO START
***************************

Description:

Field ack in Receiver required a bean of type 'org.springframework.kafka.support.Acknowledgment' that could not be found.


Action:

Consider defining a bean of type 'org.springframework.kafka.support.Acknowledgment' in your configuration.

Я погуглил эту ошибку, я обнаружил, что мне нужно добавить @Component, но он уже присутствует в моем потребительском коде.

Мой потребительский код выглядит так: Receiver.java

import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    @Autowired
    public Acknowledgment ack;

    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "${kafka.topic.TestTopic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord){
            System.out.println(consumerRecord.value());
            latch.countDown();
            ack.acknowledge();
    }
}

Код моего производителя выглядит так: Sender.java

import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private KafkaTemplate<String, Map<String, Object>> kafkaTemplate;

    public void send(Map<String, Object> map){
            kafkaTemplate.send("TestTopic", map);

    }

}

ИЗМЕНИТЬ 1:

Мой новый потребительский код выглядит так: Receiver.java

import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "${kafka.topic.TestTopic}", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack){
            System.out.println(consumerRecord.value());
            latch.countDown();
            ack.acknowledge();
    }
}

Я также изменил свой класс конфигурации:

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() throws SendGridException {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
            return props;

    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /*@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }*/

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

После добавления containerFactory = "kafkaManualAckListenerContainerFactory" в мой метод receive () я получаю следующую ошибку.

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

person kumarhimanshu449    schedule 20.09.2017    source источник


Ответы (2)


Тем, кто все еще ищет решение этих ошибок, касающихся ручного подтверждения, вам не нужно указывать containerFactory = "kafkaManualAckListenerContainerFactory", вместо этого вы можете просто добавить:

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

в конфигурацию приемника непосредственно перед возвратом фабричного объекта.

Тогда вам также понадобятся:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

в реквизитах конфигурации потребителя.

В итоге ваш метод слушателя может просто выглядеть так:

@KafkaListener(topics = "${spring.kafka.topic}")
    private void listen(@Payload String payload, Acknowledgment acknowledgment) {
        //Whatever code you want to do with the payload
        acknowledgement.acknowledge(); //or even pass the acknowledgment to a different method and acknowledge even later
    }
person Zim8662    schedule 30.01.2019
comment
Есть ли у кого-нибудь минимальный рабочий пример всего рабочего приложения, использующего этот шаблон? - person Jay Sullivan; 12.02.2019
comment
Конечно, я создал для вас этот простой проект, взгляните на мою страницу git: github.com/zim8662 / kafka-example Просто запустите сервер zookeeper и kafka локально или, если он удаленно, измените свойство в application.properties (вместе со свойствами группы и темы, конечно) и попробуйте localhost: 8080 / test в своем браузере. - person Zim8662; 13.02.2019
comment
Привет, Зим! Интересно, что будет, если затем установить props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)? Будет ли kafka выполнять автоматическую фиксацию? и будет ли Spring снова пытаться выполнить фиксацию? - person alex; 22.05.2020
comment
@alex Я думаю, что это простое, но хорошее объяснение этого свойства: medium.com/@danieljameskay/ - person Zim8662; 14.07.2020
comment
Я думаю, что ваш ответ плохой, поскольку изменение затронет всех слушателей Kafka, привязанных к ListenerContainerFactory. - person Jack Zhu; 22.04.2021
comment
Когда мы обрабатываем подтверждение вручную, нам нужно установить ContainerProperties.AckMode.MANUAL_IMMEDIATE и map.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false). В противном случае мы получим следующее исключение. org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: не удалось разрешить параметр метода с индексом 0 в общедоступной void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation (java.lang .String, java.lang.String): 1 ошибка (и): [Ошибка в объекте 'payload': коды []; arguments []; - person MR_K; 13.05.2021

Вам действительно следует следовать документации:

При использовании ручного AckMode слушатель также может быть снабжен Acknowledgment; в этом примере также показано, как использовать другую фабрику контейнеров.

@KafkaListener(id = "baz", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

На самом деле нигде не указано, что Acknowledgment - это bean-компонент. Итак, измените подпись вашего метода receive() @KafkaListener соответствующим образом и удалите этот @Autowired для подозрительного Acknowledgment bean - его просто не существует, потому что этот объект является частью (заголовком) каждого полученного сообщения.

person Artem Bilan    schedule 20.09.2017
comment
Привет, @Artem, я внес изменения, но теперь получаю еще одну ошибку bean-компонента. Я обновляю свой вопрос новым Edit. Документация spring kafka не так уж и понятна. - person kumarhimanshu449; 21.09.2017
comment
1. В вашем receive() методе все еще нет Acknowledgment arg. Итак, я не уверен, что это за ack var. 2. У вас есть смесь необработанной конфигурации Spring Kafka и Spring Boot. Вам следует подумать о том, чтобы начать с этого: docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/ и просто перенастройте готовый компонент kafkaListenerContainerFactory с помощью соответствующего ConsumerFactory bean. В вашем случае это ConsumerFactory<String, String>, но должно быть ConsumerFactory<?, ?> - person Artem Bilan; 21.09.2017
comment
Сказать, что вы должны следовать документации, непродуктивно; документы здесь дико сбивают с толку и не подходят для новичков. Было бы неплохо дать ссылку на минимально работающее приложение. - person Jay Sullivan; 12.02.2019
comment
@ArtemBilan Ссылка на документацию не работает. - person Lakshmikant Deshpande; 11.06.2019