Обработка ошибок с помощью @KafkaListener

Я использую spring-kafka со следующей конфигурацией:

package com.danigu.fancypants.infrastructure;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;

/**
 * @author dani
 */
@Data
@EnableKafka
@Configuration
@Import({KafkaConfigurationProperties.class})
public class KafkaConfiguration {
    @Inject KafkaConfigurationProperties kcp;

    protected Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kcp.getBrokerAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kcp.getGroupId());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

    @Bean
    public StringJsonMessageConverter stringJsonMessageConverter(ObjectMapper mapper) {
        return new StringJsonMessageConverter(mapper);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            StringJsonMessageConverter messageConverter) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setMessageConverter(messageConverter);
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1);
        factory.setRetryTemplate(retryTemplate());

        return factory;
    }

    /*
     * Retry template.
     */

    protected RetryPolicy retryPolicy() {
        SimpleRetryPolicy policy = new SimpleRetryPolicy();
        policy.setMaxAttempts(3);
        return policy;
    }

    protected BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);
        return policy;
    }

    protected RetryTemplate retryTemplate() {
       RetryTemplate template = new RetryTemplate();

       template.setRetryPolicy(retryPolicy());
       template.setBackOffPolicy(backOffPolicy());

       return template;
    }
}

А мой слушатель выглядит так:

package com.danigu.fancypants.integration.inbound.dress;

import com.danigu.fancypants.integration.inbound.InvalidRequestException;
import com.danigu.fancypants.integration.inbound.dress.payload.DressRequest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.util.Set;

/**
 * @author dani
 */
@Component
public class DressListener {

    @Inject protected Validator validator;

    @KafkaListener(topics = {"${kafka.dressesTopic}"})
    public void onMessage(@Payload DressRequest request, Acknowledgment acknowledgment) {
        assertValidRequest(request);

        System.out.println(request);

        acknowledgment.acknowledge();
    }

    protected void assertValidRequest(DressRequest request) {
        final Set<ConstraintViolation<DressRequest>> violations = validator.validate(request);

        if(!violations.isEmpty()) {
            throw new InvalidRequestException(violations, request);
        }
    }
}

До сих пор я просматривал тесты и справочную документацию spring-kafka, там в документации говорится, что ErrorHandler для соответствующего типа должен быть настроен, это test подразумевает, что я должен настроить его на ContainerProperties, хотя в моем использовании это только один обработчик ошибок. В случае, я хотел бы определить несколько (для разных типов полезной нагрузки), возможно ли это, если да, как?

Кроме того, есть ли способ описать, какой обработчик ошибок использовать для аннотированного слушателя void?

Кроме того, есть ли способ описать RecoveryCallback по @KafkaListener или, может быть, по разным темам, или для этого должны быть разные ListenerContainerFactory?

Я могу ошибиться, может ли кто-нибудь указать мне правильное направление, как мне правильно настроить несколько ErrorHandler для разных типов полезной нагрузки?


person Daniel Gulyas    schedule 24.03.2017    source источник


Ответы (1)


Я не уверен, что вы имеете в виду под «разными типами полезной нагрузки», поскольку у вас есть только один @KafkaListener. @KafkaListener на уровне класса может иметь @KafkaHandler на уровне метода для разных типов полезной нагрузки.

В любом случае для каждого контейнера существует только один обработчик ошибок, поэтому вам потребуется отдельная фабрика контейнеров для каждого обработчика ошибок (то же самое для обратного вызова восстановления).

Недавно мы добавили errorHandler на @RabbitListener в spring-amqp ...

/**
 * Set an {@link RabbitListenerErrorHandler} to invoke if the listener method throws
 * an exception.
 * @return the error handler.
 * @since 2.0
 */
String errorHandler() default "";

... так что каждый метод может иметь свой собственный обработчик ошибок.

Вероятно, мы сделаем что-то подобное в следующем выпуске spring-kafka. Но по-прежнему будет только один для каждого @KafkaListener, поэтому для @KafkaListener уровня класса это не поможет.

person Gary Russell    schedule 24.03.2017
comment
Привет, Гэри! Прежде всего, спасибо за spring-kafka, ваш рассказ о S1P и ваш ответ! Мой странно сформированный вопрос в основном спрашивал, могу ли я использовать разные обработчики ошибок для разных @KafkaListeners (по разным темам и с разными типами сообщений) без создания разных фабрик, на что вы ответили, спасибо! Я думаю, что добавление аннотированного параметра - хорошее решение для этого, я посмотрю на средство отслеживания проблем Spring, если я могу помочь с этим. Еще раз спасибо, приятных выходных! :) - person Daniel Gulyas; 26.03.2017
comment
Мы используем выпуски GitHub для этого проекта; Я открыл для этого # 256. Взносы приветствуются; изменение, вероятно, будет похоже на часть обработки ошибок в this rabbitmq commit >. - person Gary Russell; 26.03.2017