Spring-boot-starter RabbitMQ глобальная обработка ошибок

Я использую spring-boot-starter-amqp 1.4.2. Производитель и потребитель работают нормально, но иногда входящие сообщения JSON имеют неправильный синтаксис. Это приводит к следующему (правильному) исключению:

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
Caused by:  org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...

В будущем я могу столкнуться с гораздо большим количеством исключений. Итак, я хочу настроить глобальный обработчик ошибок, чтобы при возникновении исключения у любого потребителя я мог обрабатывать его глобально.

Примечание: в этом случае сообщение вообще не доходит до потребителя. Я хочу обрабатывать такие исключения глобально для потребителя.

Пожалуйста, найдите приведенный ниже код:

RabbitConfiguration.java

@Configuration
@EnableRabbit
public class RabbitMqConfiguration {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public MessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    @Primary
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

}

Потребитель

@RabbitListener(
        id = "book_queue",
        bindings = @QueueBinding(
                value = @Queue(value = "book.queue", durable = "true"),
                exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"),
                key = "book.queue"
        )
    )
public void handle(Message message) {
//Business Logic
}

Может ли кто-нибудь помочь мне справиться с обработчиком ошибок во всем мире. Ваша помощь должна быть заметной.

Обновленный вопрос согласно комментарию Гэри

Я могу запустить ваш пример и получить ожидаемый результат, как вы сказали, я просто хочу попробовать еще несколько отрицательных случаев на основе вашего примера, но я не мог понять несколько вещей,

this.template.convertAndSend(queue().getName(), new Foo("bar"));

вывод

Получено: Foo [foo = bar]

Приведенный выше код работает нормально. Теперь вместо «Foo» я отправляю другой bean-компонент.

this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you"));

вывод

Получено: Foo [foo = null]

Потребитель не должен принимать это сообщение, потому что это совершенно другой bean-компонент (Differ.class, а не Foo.class), поэтому я ожидаю, что он должен перейти в «ConditionalRejectingErrorHandler». Почему он принимает неправильную полезную нагрузку и печатает как null? Пожалуйста, поправьте меня, если я ошибаюсь.

Изменить 1:

Гэри, как вы сказали, я установил заголовок «TypeId» при отправке сообщения, но все же потребитель может конвертировать неправильные сообщения, и это не вызывает никаких ошибок ... пожалуйста, найдите код ниже, я использовали ваши образцы кода и только что внесли следующие изменения,

1) Добавлен "__TypeId__" при отправке сообщения

this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    }); 

2) Добавлен DefaultClassMapper в Jackson2JsonMessageConverter

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return new Jackson2JsonMessageConverter();
}    

person VelNaga    schedule 13.02.2017    source источник


Ответы (1)


Переопределить компонент фабрики контейнера слушателя загрузки, как описано в Включить аннотации конечных точек прослушивателя.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setErrorHandler(myErrorHandler());
    ...
    return factory;
}

Вы можете внедрить собственную реализацию ErrorHandler, которая будет добавлена ​​в каждый контейнер слушателя, создаваемый фабрикой.

void handleError(Throwable t);

Выбрасываемым будет ListenerExecutionFailedException, который, начиная с версии 1.6.7 (загрузка 1.4.4), имеет необработанное входящее сообщение в свойстве failedMessage.

Обработчик ошибок по умолчанию считает такие причины, как MessageConversionException, критическими (они не будут повторно добавляться в очередь).

Если вы хотите сохранить это поведение (нормальное для таких проблем), вы должны выдать AmqpRejectAndDontRequeueException после обработки ошибки.

Между прочим, вам не нужен этот RabbitTemplate bean; если у вас есть только один MessageConverter bean-компонент в приложении, загрузка автоматически подключит его к контейнерам и шаблону.

Поскольку вы переопределите заводскую настройку загрузки, вам потребуется подключить преобразователь туда.

ИЗМЕНИТЬ

Вы можете использовать значение по умолчанию ConditionalRejectingErrorHandler, но добавить его с помощью специальной реализации FatalExceptionStrategy. Фактически, вы можете создать подкласс DefaultExceptionStrategy и переопределить isFatal(Throwable t), а затем, после обработки ошибки, вернуть super.isFatal(t).

РЕДАКТИРОВАТЬ2

Полный пример; отправляет 1 хорошее сообщение и 1 плохое:

package com.example;

import org.slf4j.Logger;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
public class So42215050Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar"));
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties());
        });
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        System.out.println("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }
}

Результат:

Received: Foo [foo=bar]

2017-02-14 09: 42: 50.972 ОШИБКА 44868 --- [cTaskExecutor-1] 5050Application $ MyFatalExceptionStrategy: не удалось обработать входящее сообщение из очереди So42215050; сообщение об ошибке: (Body: 'some bad json' MessageProperties [headers = {TypeId = com.example.So42215050Application $ Foo}, timestamp = null, messageId = null, userId = null, receiveUserId = null, appId = null, clusterId = null, type = null, correlationId = null, correlationIdString = null, replyTo = null, contentType = application / json, contentEncoding = UTF-8, contentLength = 0, deliveryMode = null, receiveDeliveryMode = PERSISTENT, expiration = null, priority = 0, redelivered = false, receiveExchange =, receiveRoutingKey = So42215050, receiveDelay = null, deliveryTag = 2, messageCount = 0, consumerTag = amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue = So42215050])

РЕДАКТИРОВАТЬ3

JSON не передает никакой информации о типе. По умолчанию тип, в который нужно преобразовать, будет выведен из типа параметра метода. Если вы хотите отклонить все, что не может быть преобразовано в этот тип, вам необходимо соответствующим образом настроить конвертер сообщений.

Например:

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return converter;
}

Теперь, когда я изменяю свой пример, отправляя Bar вместо _19 _...

public static class Bar {

   ...

}

и

this.template.convertAndSend(queue().getName(), new Bar("baz"));

Я получил...

Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]

Но это работает, только если отправитель устанавливает заголовок __TypeId__ (что и делает шаблон, если он настроен с тем же адаптером).

EDIT4

@SpringBootApplication
public class So42215050Application {

    private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
        });
        Message message = MessageBuilder
                .withBody("{\"foo\":\"bar\"}".getBytes())
                .andProperties(
                        MessagePropertiesBuilder
                            .newInstance()
                            .setContentType("application/json")
                            .build())
                .build();
        this.template.send(queue().getName(), message); // Success - default Foo class when no header
        message.getMessageProperties().setHeader("__TypeId__", "foo");
        this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
        message.getMessageProperties().setHeader("__TypeId__", "bar");
        this.template.send(queue().getName(), message); // fail - mapped to a Map
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        logger.info("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultClassMapper mapper = new DefaultClassMapper();
        mapper.setDefaultType(Foo.class);
        Map<String, Class<?>> mappings = new HashMap<>();
        mappings.put("foo", Foo.class);
        mappings.put("bar", Object.class);
        mapper.setIdClassMapping(mappings);
        converter.setClassMapper(mapper);
        return converter;
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }

    public static class Bar {

        private String foo;

        public Bar() {
            super();
        }

        public Bar(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Bar [foo=" + this.foo + "]";
        }

    }

}
person Gary Russell    schedule 13.02.2017
comment
Большое спасибо за ваш ответ. Нужны еще несколько разъяснений, которых я не понимаю из вашего ответа. 1) Чтобы реализовать глобальный обработчик ошибок, у нас должен быть bean-компонент SimpleRabbitListenerContainerFactory? (Другого пути нет) 2) Я вижу ErrorHandler как метод. Можно ли определить bean-компонент как ErrorHandler? Не могли бы вы поделиться образцом кода для эффективного способа написания ErrorHandler? Хотя бы подсказку или ссылку. 3) Вы упомянули, что наличие одного MessageConverter является избыточным, и загрузка автоматически подключит его к контейнерам. Для меня Spring-Boot не делает этого автоматически, я что-то упускаю? - person VelNaga; 14.02.2017
comment
Теперь это доступно в spring-amqp-samples как spring-rabbit-global-errorhandler. - person Gary Russell; 14.02.2017
comment
Большое спасибо. Позвольте мне попробовать, когда все заработает, я сразу приму ответ - person VelNaga; 14.02.2017
comment
Извините за очень поздний ответ ... Ваш пример работает нормально, но если я передаю this.template.convertAndSend (queue (). GetName (), new Differ (закуска, привет, как дела)); Кролик принимает это полезная нагрузка, но она должна отклониться, потому что мой потребитель ожидает Foo, но я передаю Differ, который представляет собой другой класс и разные свойства, могу ли я узнать, почему он не поступает в ConditionalRejectingErrorHandler? - person VelNaga; 02.03.2017
comment
Не помещайте код в комментарии, он нечитабелен - вместо этого отредактируйте свой вопрос. Непонятно, о чем вы спрашиваете; вы не можете получить ошибку в template.send, потому что отправляющая сторона не знает, чего хочет потребитель. Если вы имеете в виду, что не видите его и на стороне потребителя, приложите журнал DEBUG для доставки. - person Gary Russell; 02.03.2017
comment
Я обновил свой вопрос, теперь он будет ясен. Я не ожидаю ошибки от template.send, но я ожидаю, что она должна перейти в ConditionalRejectingErrorHandler, поскольку это неправильная полезная нагрузка. - person VelNaga; 02.03.2017
comment
Это потому, что тип назначения выводится из параметра метода. JSON неявно не передает информацию о типе. См. Мою следующую правку, чтобы узнать, как настроить конвертер для преобразования только в определенный тип. - person Gary Russell; 03.03.2017
comment
Большое спасибо за ваш ответ. Таким образом, нам нужно установить информацию typeId от отправителя, которую вы также настраиваете в конвертере, который является глобальным для всех потребителей в компонентах, предположим, если мои компоненты содержат несколько потребителей, и каждый потребитель может потреблять разные полезные нагрузки, тогда это решение не будет работать. есть ли способ настроить этот тип информации в аннотации @RabbitListener или при объявлении очереди? - person VelNaga; 03.03.2017
comment
С JSON вы должны указать конвертеру, во что конвертировать json. Мы стараемся помочь (например, определяя тип по методу или по отправителю, отправляющему информацию о типе в заголовках), но вы всегда можете настроить свой собственный конвертер. Если вам нужно несколько типов, то каждый @RabbitListener должен быть создан другим SimpleRabbitListenerContainerFactory bean (каждый из которых может быть настроен для внедрения другого конвертера). Вы можете указать, какую фабрику использовать, с атрибутом containerFactory на @RabbitListener. - person Gary Russell; 03.03.2017
comment
В качестве альтернативы вы можете получить доступ к контейнерам (по идентификатору) из bean-объекта RabbitListenerEndpointRegistry; и меняйте преобразователь в каждом контейнере перед их запуском. - person Gary Russell; 03.03.2017
comment
Еще раз большое спасибо за ваш ответ. Позвольте мне поподробнее об этом. Между тем, с ConditionRejectingErrorHandler, помимо недопустимой полезной нагрузки, можем ли мы поймать любое другое исключение? Также не могли бы вы поделиться ссылкой для настройки обмена мертвыми письмами для ConditionRejectingErrorHandler? - person VelNaga; 03.03.2017
comment
Я установил заголовок TypeId при отправке сообщения и настроил DefaultClassMapper на принимающей стороне, но все же мои потребители потребляют неправильную полезную нагрузку. Правильно ли я устанавливаю TypeId? Пожалуйста, помогите мне решить эту проблему - person VelNaga; 08.03.2017
comment
Когда я устанавливаю какое-то поддельное имя класса, я получаю Caused by: java.lang.ClassNotFoundException: foo. Если я добавлю mapper.setIdClassMapping(Collections.singletonMap("foo", Object.class)); в сопоставитель классов, я получу Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.example.So42215050Application$Foo] for GenericMessage [payload={foo=baz},... - person Gary Russell; 08.03.2017
comment
@Gary ... нам нужно установить заголовок TypeId в отправителе? также нам нужно установить свойство DefaultType в DefaultClassMapper? - person VelNaga; 08.03.2017
comment
@ Gary .. к сожалению, у меня это не работает .. Я установил idClassMapping, но мой потребитель все равно получает сообщение и распечатывает его как null - person VelNaga; 08.03.2017
comment
Все, что я могу сделать, это сделать репост своего образца, который показывает, что он работает так, как я описываю. См. EDIT4. - person Gary Russell; 08.03.2017
comment
@Gary ... Извините, это моя ошибка, вместо того, чтобы вернуть измененный Jackson2JsonMessageConverter, я вернул новый экземпляр Jackson2JsonMessageConverter, поэтому он не генерирует никаких исключений, теперь он работает безупречно. - person VelNaga; 08.03.2017
comment
@GaryRussell В Редактировать 2 - Зачем вы создаете фабрику компонентов? - person ; 26.03.2017
comment
Spring Boot создает его в ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args); - person Gary Russell; 26.03.2017
comment
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {, это выдает ошибку компиляции, поскольку DefaultExceptionStrategy является частным внутренним классом. Как решить такую ​​же? - person Paramesh Korrakuti; 14.06.2017
comment
как я могу скрыть исключение, которое использовалось для повторного сообщения! stackoverflow.com/questions/50350377/ - person ; 15.05.2018