Spring AMQP v1.4.2 - проблема повторного подключения кролика при сбое сети

Я тестирую следующий сценарий в Spring AMQP v1.4.2, и он не может повторно подключиться после сбоя сети:

  1. Запустите приложение spring, которое асинхронно потребляет сообщения, используя rabbit: listener-container и rabbit: connection-factory (подробная конфигурация приводится ниже).
  2. Журнал показывает, что приложение успешно принимает сообщения.
  3. Сделайте RabbitMQ невидимым для приложения, отключив входящий сетевой трафик на сервере Rabbit: sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROP
  4. Подождите не менее 3 минут (для сетевых подключений истечет время ожидания).
  5. Исправьте соединение с: sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROP
  6. Подождите некоторое время (даже пробовал больше часа), и повторного подключения не происходит.
  7. Перезапустите приложение, и оно снова начнет получать сообщения, что означает, что сеть вернулась в нормальное состояние.

Я также протестировал тот же сценарий с отключением сетевого адаптера виртуальной машины вместо отключения iptables, и происходит то же самое, то есть без автоматического переподключения. Интересно, что когда я пробую iptables REJECT вместо DROP, он работает должным образом, и приложение перезапускается, как только я удаляю правило отклонения, но я думаю, что отклонение больше похоже на сбой сервера, чем на сбой сети. .

Согласно справочному документу:

Если MessageListener выходит из строя из-за бизнес-исключения, исключение обрабатывается контейнером прослушивателя сообщений, а затем он возвращается к прослушиванию другого сообщения. Если сбой вызван обрывом соединения (а не исключением для бизнеса), то потребителя, который собирает сообщения для слушателя, необходимо отменить и перезапустить. SimpleMessageListenerContainer обрабатывает это без проблем и оставляет журнал, в котором говорится, что прослушиватель перезапускается. Фактически, он бесконечно зацикливается, пытаясь перезапустить потребителя, и только если потребитель ведет себя очень плохо, действительно ли он сдаться. Один из побочных эффектов заключается в том, что если брокер не работает при запуске контейнера, он будет продолжать попытки до тех пор, пока не будет установлено соединение.

Это журнал, который я получаю примерно через минуту после отключения:

    2015-01-16 14:00:42,433 WARN  [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
    ... 1 common frames omitted

И я получаю это сообщение журнала через несколько секунд после переподключения:

2015-01-16 14:18:14,551 WARN  [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out

ОБНОВЛЕНИЕ: Как ни странно, когда я включаю ведение журнала DEBUG в пакете org.springframework.amqp, переподключение происходит успешно, и я больше не могу воспроизвести проблему!

Не включив ведение журнала отладки, я попытался отладить код Spring AMQP. Я заметил, что вскоре после удаления iptables drop вызывается метод SimpleMessageListenerContainer.doStop(), который, в свою очередь, вызывает shutdown () и отменяет все каналы. Я также получил это сообщение журнала, когда поставил точку останова на doStop (), которая, похоже, связана с причиной:

2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    ... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer

ОБНОВЛЕНИЕ 2: после установки requested-heartbeat на 30 секунд, как было предложено в ответе, повторное подключение работало большую часть времени и успешно переопределило эксклюзивную временную очередь, привязанную к обмену разветвлениями, но это все еще не удалось повторно подключайтесь время от времени.

В тех редких случаях, когда это не удалось, я наблюдал за консолью управления RabbitMQ во время теста и наблюдал, что новое соединение было установлено (после того, как старое соединение было удалено по таймауту), но эксклюзивная временная очередь не была переопределена после повторного подключения. Также клиент не получал никаких сообщений. Сейчас действительно сложно воспроизвести проблему достоверно, так как это случается реже. Я предоставил полную конфигурацию ниже, теперь содержащую объявления очереди.

ОБНОВЛЕНИЕ 3: даже после замены монопольной временной очереди именованной очередью с автоматическим удалением иногда происходит то же самое; т.е. именованная очередь с автоматическим удалением не переопределяется после повторного подключения, и сообщения не принимаются до перезапуска приложения.

Я был бы очень признателен, если бы кто-нибудь мог мне в этом помочь.

Вот весенняя конфигурация AMQP, на которую я полагаюсь:

<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>

<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
    <rabbit:bindings>
        <rabbit:binding queue="control-queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="none"
                           concurrency="1"
                           prefetch="1">
    <rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>

</rabbit:listener-container>

<rabbit:connection-factory id="connection-factory"
                           username="${rabbit.username}"
                           password="${rabbit.password}"
                           host="${rabbit.host}"
                           virtual-host="${rabbit.virtualhost}"
                           publisher-confirms="true" 
                           channel-cache-size="100"
                           requested-heartbeat="30" />

<rabbit:admin id="admin" connection-factory="connection-factory"/>

<rabbit:queue id="qu0-id" name="qu0">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="dead-letter"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
    <rabbit:bindings>
        <rabbit:binding queue="qu0" pattern="p.0"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="manual"
                           concurrency="4"
                           prefetch="30">
    <rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>

person Amir Moghimi    schedule 16.01.2015    source источник
comment
Разве вы не имеете в виду, что в самых ранних версиях Spring AMQP такой проблемы нет?   -  person Artem Bilan    schedule 16.01.2015
comment
Не могли бы вы поделиться журналами для категории org.springframework.amqp.rabbit.listener на уровне DEBUG, чтобы получить дополнительную информацию по этому вопросу? Кстати, я просто пробовал аналогичную (или нет?) Эмуляцию с tcpTrace в Windows и вижу похожие Caused by: java.io.EOFException: null at java.io.DataInputStream.readUnsignedByte в журналах. Но когда я перезапускаю trace, соединение восстанавливается. Мой клиент AMQP 3.4.2 - транзитивная зависимость от Spring AMQP.   -  person Artem Bilan    schedule 16.01.2015
comment
Не относится к Spring AMQP, но вы можете попробовать использовать Lyra вместо этого, если есть возможность повторно подключаться и восстанавливать ресурсы, например очереди - это то, что вам нужно.   -  person Jonathan    schedule 31.01.2015


Ответы (3)


Я только что выполнил ваш тест, как описано (кролик в Linux использует iptables для отбрасывания пакетов).

Когда соединение восстанавливается, журнала нет (возможно, стоит).

Я предлагаю вам включить ведение журнала отладки, чтобы увидеть переподключение.

РЕДАКТИРОВАТЬ:

Из документации rabbitmq:

Эксклюзивные эксклюзивные очереди могут быть доступны только для текущего соединения и удаляются, когда это соединение закрывается. Пассивное объявление эксклюзивной очереди другими соединениями не допускается.

Из вашего исключения:

код-ответа = 405, текст-ответа = RESOURCE_LOCKED - невозможно получить монопольный доступ к заблокированной очереди 'e4288669-2422-40e6-a2ee-b99542509273' в vhost '/', class-id = 50, method-

Таким образом, проблема в том, что брокер все еще считает, что существует другая связь.

  1. Не используйте эксклюзивную очередь (вы все равно потеряете сообщения с такой очередью). Или,
  2. Установите низкий requestedHeartbeat, чтобы брокер быстрее обнаружил потерянное соединение.
person Gary Russell    schedule 16.01.2015
comment
Спасибо, Гэри. Я попробовал вести журнал отладки и обновил вопрос, добавив дополнительную информацию. Кажется, что вскоре после повторного подключения повторное объявление очереди завершается ошибкой и закрывает SimpleMessageListenerContainer. - person Amir Moghimi; 20.01.2015
comment
Я отредактировал ответ; в будущем, пожалуйста, показывайте всю конфигурацию, включая вашу очередь (очереди). - person Gary Russell; 20.01.2015
comment
Сожалею, что не удалось восстановить подключение, но на этот раз время от времени. Я обновил вопрос с подробностями и полной конфигурацией. - person Amir Moghimi; 23.01.2015
comment
Как я уже говорил ранее, не используйте эксклюзивные очереди, если вы хотите иметь дело с этим конкретным сбоем в сети при любых обстоятельствах. Если вас не волнуют потерянные сообщения, просто используйте очередь автоматического удаления (но не исключительную) или уменьшите requestedHeartbeat до еще меньшего значения. Или используйте именованную очередь, и в этом случае вы не столкнетесь с этой проблемой. - person Gary Russell; 23.01.2015
comment
Кроме того, я думаю, что более серьезный вопрос заключается в том, почему вы испытываете такие сетевые потери в реальной среде (а не в ваших симуляциях с использованием iptables для отбрасывания пакетов). - person Gary Russell; 23.01.2015
comment
Я просто моделирую проблему конфигурации сетевого раздела / брандмауэра в производственной среде. Спасибо, я могу попробовать удалить эксклюзивную очередь, чтобы посмотреть, поможет ли это. - person Amir Moghimi; 23.01.2015
comment
В моем случае проблема действительно заключалась в следующем: RabbitMQ подумал, что старые соединения все еще существуют, и запретил контейнеру Spring AMQP повторно объявлять эксклюзивную анонимную очередь. Контейнер Spring AMQP пытался повторно объявить очередь только 4 раза, но затем отказался. После этого: мое приложение все еще работает, сетевое соединение в порядке, но сообщения не получены. Слава богу, в SimpleRabbitListenerContainerFactory есть опция missingQueuesFatal, которая заставляет контейнер AMQP Spring пытаться повторно объявлять очередь бесконечно (а не только 4 раза)! Или мы можем установить requestedHeartbeat меньше retryCount * interval. - person Ruslan Stelmachenko; 05.07.2016

Мы также сталкиваемся с этой проблемой в нашей производственной среде, возможно, из-за того, что узлы Rabbit работают как виртуальные машины на разных стойках ESX и т. Д. Обходной путь, который мы обнаружили, заключался в том, чтобы наше клиентское приложение постоянно пыталось повторно подключиться, если оно отключается от кластера. . Ниже приведены настройки, которые мы применили, и они сработали:

<util:properties id="spring.amqp.global.properties">
  <prop key="smlc.missing.queues.fatal">false</prop>
</util:properties>

Этот атрибут изменяет глобальное поведение Spring AMQP при сбое объявления очередей из-за фатальных ошибок (брокер недоступен и т. Д.). По умолчанию контейнер пытается только 3 раза (см. Сообщение журнала, показывающее «retries left = 0»).

Ссылка: http://docs.spring.io/spring-amqp/reference/htmlsingle/#containerAttributes

Кроме того, мы добавили интервал восстановления, чтобы контейнер восстанавливался после нефатальных ошибок. Однако та же самая конфигурация также используется, когда глобальное поведение заключается в повторении попыток и при фатальных ошибках (например, при отсутствии очередей).

<rabbit:listener-container recovery-interval="15000" connection-factory="consumerConnectionFactory">
....
</rabbit:listener-container>
person Shreerang    schedule 16.02.2016

Установите setRequestedHeartBeat на ConnectionFactory и setMissingQueuesFatal(false) на SimpleMessageListenerContainer, чтобы повторять попытки подключения на неопределенный срок. По умолчанию для SimpleMessageListenerContainer setMissingQueuesFatal установлено значение true, и будет выполнено только 3 попытки.

  @Bean
  public ConnectionFactory connectionFactory() {
    final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHost(), getPort());
    connectionFactory.setUsername(getUsername());
    connectionFactory.setPassword(getPassword());
    connectionFactory.setVirtualHost(getVirtualHost());
    connectionFactory.setRequestedHeartBeat(30);
    return connectionFactory;
  }

  @Bean
  public SimpleMessageListenerContainer listenerContainerCopernicusErrorQueue() {
    final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames(myQueue().getName());
    container.setMessageListener(messageListenerAdapterQueue());
    container.setDefaultRequeueRejected(false);
    container.setMissingQueuesFatal(false);
    return container;
  }
person L. G.    schedule 02.02.2017