Я тестирую следующий сценарий в Spring AMQP v1.4.2, и он не может повторно подключиться после сбоя сети:
- Запустите приложение spring, которое асинхронно потребляет сообщения, используя rabbit: listener-container и rabbit: connection-factory (подробная конфигурация приводится ниже).
- Журнал показывает, что приложение успешно принимает сообщения.
- Сделайте RabbitMQ невидимым для приложения, отключив входящий сетевой трафик на сервере Rabbit:
sudo iptables -A INPUT -p tcp --destination-port 5672 -j DROP
- Подождите не менее 3 минут (для сетевых подключений истечет время ожидания).
- Исправьте соединение с:
sudo iptables -D INPUT -p tcp --destination-port 5672 -j DROP
- Подождите некоторое время (даже пробовал больше часа), и повторного подключения не происходит.
- Перезапустите приложение, и оно снова начнет получать сообщения, что означает, что сеть вернулась в нормальное состояние.
Я также протестировал тот же сценарий с отключением сетевого адаптера виртуальной машины вместо отключения iptables, и происходит то же самое, то есть без автоматического переподключения. Интересно, что когда я пробую iptables REJECT вместо DROP, он работает должным образом, и приложение перезапускается, как только я удаляю правило отклонения, но я думаю, что отклонение больше похоже на сбой сервера, чем на сбой сети. .
Согласно справочному документу:
Если MessageListener выходит из строя из-за бизнес-исключения, исключение обрабатывается контейнером прослушивателя сообщений, а затем он возвращается к прослушиванию другого сообщения. Если сбой вызван обрывом соединения (а не исключением для бизнеса), то потребителя, который собирает сообщения для слушателя, необходимо отменить и перезапустить. SimpleMessageListenerContainer обрабатывает это без проблем и оставляет журнал, в котором говорится, что прослушиватель перезапускается. Фактически, он бесконечно зацикливается, пытаясь перезапустить потребителя, и только если потребитель ведет себя очень плохо, действительно ли он сдаться. Один из побочных эффектов заключается в том, что если брокер не работает при запуске контейнера, он будет продолжать попытки до тех пор, пока не будет установлено соединение.
Это журнал, который я получаю примерно через минуту после отключения:
2015-01-16 14:00:42,433 WARN [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
... 1 common frames omitted
И я получаю это сообщение журнала через несколько секунд после переподключения:
2015-01-16 14:18:14,551 WARN [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out
ОБНОВЛЕНИЕ: Как ни странно, когда я включаю ведение журнала DEBUG в пакете org.springframework.amqp, переподключение происходит успешно, и я больше не могу воспроизвести проблему!
Не включив ведение журнала отладки, я попытался отладить код Spring AMQP. Я заметил, что вскоре после удаления iptables drop вызывается метод SimpleMessageListenerContainer.doStop()
, который, в свою очередь, вызывает shutdown () и отменяет все каналы. Я также получил это сообщение журнала, когда поставил точку останова на doStop (), которая, похоже, связана с причиной:
2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer
ОБНОВЛЕНИЕ 2: после установки requested-heartbeat
на 30 секунд, как было предложено в ответе, повторное подключение работало большую часть времени и успешно переопределило эксклюзивную временную очередь, привязанную к обмену разветвлениями, но это все еще не удалось повторно подключайтесь время от времени.
В тех редких случаях, когда это не удалось, я наблюдал за консолью управления RabbitMQ во время теста и наблюдал, что новое соединение было установлено (после того, как старое соединение было удалено по таймауту), но эксклюзивная временная очередь не была переопределена после повторного подключения. Также клиент не получал никаких сообщений. Сейчас действительно сложно воспроизвести проблему достоверно, так как это случается реже. Я предоставил полную конфигурацию ниже, теперь содержащую объявления очереди.
ОБНОВЛЕНИЕ 3: даже после замены монопольной временной очереди именованной очередью с автоматическим удалением иногда происходит то же самое; т.е. именованная очередь с автоматическим удалением не переопределяется после повторного подключения, и сообщения не принимаются до перезапуска приложения.
Я был бы очень признателен, если бы кто-нибудь мог мне в этом помочь.
Вот весенняя конфигурация AMQP, на которую я полагаюсь:
<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>
<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
<rabbit:bindings>
<rabbit:binding queue="control-queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
acknowledge="none"
concurrency="1"
prefetch="1">
<rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>
</rabbit:listener-container>
<rabbit:connection-factory id="connection-factory"
username="${rabbit.username}"
password="${rabbit.password}"
host="${rabbit.host}"
virtual-host="${rabbit.virtualhost}"
publisher-confirms="true"
channel-cache-size="100"
requested-heartbeat="30" />
<rabbit:admin id="admin" connection-factory="connection-factory"/>
<rabbit:queue id="qu0-id" name="qu0">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dead-letter"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
<rabbit:bindings>
<rabbit:binding queue="qu0" pattern="p.0"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-container connection-factory="connection-factory"
acknowledge="manual"
concurrency="4"
prefetch="30">
<rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>
org.springframework.amqp.rabbit.listener
на уровнеDEBUG
, чтобы получить дополнительную информацию по этому вопросу? Кстати, я просто пробовал аналогичную (или нет?) Эмуляцию сtcpTrace
в Windows и вижу похожиеCaused by: java.io.EOFException: null at java.io.DataInputStream.readUnsignedByte
в журналах. Но когда я перезапускаюtrace
, соединение восстанавливается. Мой клиент AMQP3.4.2
- транзитивная зависимость от Spring AMQP. - person Artem Bilan   schedule 16.01.2015