Как избежать отключения SimpleMessageListenerContainer в случае непредвиденных ошибок?

Я использую Java boot 1.4.0 и spring-boot-starter-amqp для подключения к rabbitMq. Производитель сообщений, потребитель и сервер rabbitMq находятся под моим контролем. В течение нескольких месяцев все работало нормально. Но вдруг мой потребитель остановился с исключениями, указанными ниже. Поскольку я создаю только сообщения, которые всегда действительны, я понятия не имел, что пошло не так.

Но это привело к отключению моего контейнера-слушателя. И, следовательно, моя обработка сообщений остановилась. Мне пришлось вручную перезапустить мою программу приема сообщений.

Итак, мои вопросы:

  1. Можем ли мы избежать полного отключения контейнера-слушателя в любом неожиданном случае?
  2. Можно ли изящно отбросить такое сообщение и сохранить контейнер-слушатель живым?
  3. Если нет, то есть ли способы, с помощью которых я могу проверить, работают ли все мои контейнеры-слушатели, и запустить их, если я найду их мертвыми? (ПРИМЕЧАНИЕ: я просмотрел RabbitListenerEndpointRegistry.getListenerContainers(). Но похоже, что он не распространяется на контейнеры SimpleMessageListenerContainer.)

Журнал исключений:

2017-02-20 12:42:18.441 ERROR 18014 --- [writeToDBQQueueListenerContainer-17] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.
java.lang.NoClassDefFoundError: org/springframework/messaging/handler/annotation/support/MethodArgumentNotValidException
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.causeIsFatal(ConditionalRejectingErrorHandler.java:110) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy.isFatal(ConditionalRejectingErrorHandler.java:97) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:72) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:625) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:852) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:685) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ClassNotFoundException: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_91]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
    at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:89) ~[KattaQueueManager-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
    ... 11 common frames omitted

One more exception:

2017-02-20 12:42:18.674 ERROR 18014 --- [imageQueueListenerContainer-53] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.

java.lang.NoClassDefFoundError: com/rabbitmq/utility/Utility
        at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.checkShutdown(BlockingQueueConsumer.java:348) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:402) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1160) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.utility.Utility
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_91]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
        at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:89) ~[KattaQueueManager-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
        ... 7 common frames omitted

2017-02-20 12:42:18.675 ERROR 18014 --- [imageQueueListenerContainer-53] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer

Мой пример потребительского кода:

@Bean
public MessageConverter jsonMessageConverter(){
    //return new JsonMessageConverter();
    Jackson2JsonMessageConverter converter =  new   Jackson2JsonMessageConverter();

    converter.setClassMapper(new ClassMapper() {
        @Override
        public Class<?> toClass(MessageProperties properties) {
            return String.class;
        }

        @Override
        public void fromClass(Class<?> clazz, MessageProperties properties) {
        }
    });

    return converter;
}

@Bean
public ConnectionFactory connectionFactory() 
{
    CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory(_rabbitmqHost, _rabbitmqPort);
    return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() 
{
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(jsonMessageConverter());
    return rabbitTemplate;
}

@Bean
TopicExchange exchange() 
{
    return new TopicExchange("MyExchange");
}

@Bean
public Queue mainQueue() 
{
    return new Queue("MyMainQ");
}

@Bean
public Binding mainRouteBinding() 
{
    return BindingBuilder.bind(mainQueue()).to(exchange()).with("MyMainQ");
}

@Bean
SimpleMessageListenerContainer mainQueueListenerContainer(
        ConnectionFactory connectionFactory, 
        @Qualifier("mainQueueListenerAdapter") MessageListenerAdapter listenerAdapter) 
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueues(mainQueue());

    container.setMessageConverter(jsonMessageConverter());

    container.setMessageListener(listenerAdapter);
    container.setConcurrentConsumers(1);
    return container;
}

@Bean
MessageListenerAdapter mainQueueListenerAdapter(MainConsumer receiver) 
{
    MessageListenerAdapter msgAdapter = new MessageListenerAdapter(receiver, "receiveMessage");

    msgAdapter.setMessageConverter(jsonMessageConverter());

    return msgAdapter;
}

@Bean
MainConsumer getMainConsumer()
{
    return new MainConsumer();
}

//
//The receiving method in MainConsumer class looks as given below
public void receiveMessage(String message) 
{
     // My business logic goes here ...
}

person Amol Potnis    schedule 28.02.2017    source источник
comment
Из трассировки стека кажется, что в пути к классам отсутствует несколько классов/банок, проверьте их, существуют ли они/включены в путь к классу   -  person Bond - Java Bond    schedule 28.02.2017
comment
Спасибо за комментарий, Бонд. Это полная рабочая установка, работающая в производстве в течение нескольких месяцев. Определенно все необходимые компоненты/баночки на месте.   -  person Amol Potnis    schedule 28.02.2017
comment
хорошо.. вы видите какие-либо ExceptionInInitializerError в журналах?   -  person Bond - Java Bond    schedule 28.02.2017
comment
Нет. Это произошло только для одного входного сообщения. Поэтому я хочу знать, как избежать отключения моего контейнера-слушателя в таких непредвиденных случаях.   -  person Amol Potnis    schedule 28.02.2017
comment
@GaryRussell, я видел ваши комментарии и предложения по многим другим вопросам. Не могли бы вы помочь мне здесь?   -  person Amol Potnis    schedule 01.03.2017
comment
У меня точно такая же проблема. Вы нашли какое-нибудь решение?   -  person Dev    schedule 22.07.2020


Ответы (1)


У меня были такие же проблемы пару месяцев назад, и это помогло мне. Если версия вашего Java-клиента Rabbit ниже 4.0.0, у вас нет соединения для восстановления автоматически, вам нужно установить, как здесь:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);

// connection that will recover automatically
factory.setAutomaticRecoveryEnabled(true);

// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);

Connection conn = factory.newConnection();

Попробуйте проверить документацию по RabbitMQ: Rabbit Client API

person Brother    schedule 12.05.2017
comment
Спасибо брат. Я использую spring-boot-starter-amqp из версии Spring Boot 1.4.0. Насколько я понимаю, по умолчанию в нем включены основные функции повторного подключения, чтобы позаботиться об основных разрывах соединения. Так что в основном больше нечего настраивать на этом фронте. - person Amol Potnis; 07.06.2017