Диспетчер не имеет подписчиков при попытке корректно завершить работу приложения

У меня есть следующие требования к моему заявлению:

У меня есть поток интеграции, который берет файлы из каталога через

Files.inboundAdapter

и конфигурацию опроса следующим образом:

@Bean public PollerSpec orderOutboundFlowTempFileInPoller() {
    return Pollers
        .fixedDelay(pollerDelay)
        .maxMessagesPerPoll(100)
        .transactional();

}

Файлы должны быть переданы на удаленный хост через RemoteFileTemplate. Приложение работает в докер-контейнере, который должен быть остановлен для обслуживания или развертывания. Когда контейнер выключен, поток должен закончить запись файла на удаленный хост и не должен принимать новые входящие файлы. Поэтому я реализовал плавное завершение работы следующим образом:

@Override public void onApplicationEvent(final ContextClosedEvent event) {
  LOG.info("Trying to gracefully shutdown App");
  //CHECKSTYLE:OFF
  allFlowPollers.forEach(
    p -> {
      try {
        p.destroy();
      } catch (final Exception e) {
        LOG.warn("Unable to destroy poller.");
      }
    }
  );
  //CHECKSTLYE:ON
  FLOWS_TO_SHUTDOWN.forEach(GracefulShutdownAware::shutdown);
}

Я предположил, что когда я уничтожу опросчики, из источника больше не будет считываться файл. RemoteFileTemplate правильно отправляет текущий файл, проблем нет. Но опросчик по-прежнему получает новые файлы, и когда приложение почти закрывается, появляется следующее исключение:

timestamp=15:55:56.599, thread=task-scheduler-2, severity=ERROR, class=o.s.i.h.LoggingHandler, message=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.flowTempFileIn.channel#0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=/servicedata/tmp/1544088550162_57280.xml, headers={file_originalFile=/servicedata/tmp/1544088550162_57280.xml, id=d04473ff-d1bd-173e-d801-b7b9fd31596c, file_name=1544088550162_57280.xml, file_relativePath=1544088550162_57280.xml, timestamp=1544108156591}], failedMessage=GenericMessage [payload=/servicedata/tmp/1544088550162_57280.xml, headers={file_originalFile=/servicedata/tmp/1544088550162_57280.xml, id=d04473ff-d1bd-173e-d801-b7b9fd31596c, file_name=1544088550162_57280.xml, file_relativePath=1544088550162_57280.xml, timestamp=1544108156591}]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:227)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:290)
        at sun.reflect.GeneratedMethodAccessor292.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:197)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
        at com.sun.proxy.$Proxy138.call(Unknown Source)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:391)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:385)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=/servicedata/tmp/1544088550162_57280.xml, headers={file_originalFile=/servicedata/tmp/1544088550162_57280.xml, id=d04473ff-d1bd-173e-d801-b7b9fd31596c, file_name=1544088550162_57280.xml, file_relativePath=1544088550162_57280.xml, timestamp=1544108156591}]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        ... 33 more

Есть ли другой способ выполнить это требование? Что такое: остановка приложения должна позволить текущему файлу завершить работу, не принимать дальнейшие файлы для чтения и по-прежнему закрывать без каких-либо странных исключений?

Я предполагаю, что это какая-то проблема с синхронизацией, поскольку поток интеграции и реакция на ContextClosedEvent выполняются в разных потоках. Поллер не уничтожен полностью, но преобразователь-подписчик исходящего канала уже уничтожен.

Я также пытался остановить опросчик через шину управления, но результат был таким же.

Заранее спасибо :)


person Matthias Hanske    schedule 06.12.2018    source источник


Ответы (1)


Проблема не в опросчике, а в InboundChannelAdapter. Было бы здорово увидеть ваше IntegrationFlow определение. Фактически он должен останавливаться при завершении работы контекста приложения. И больше ничего делать со своей стороны не нужно.

Дело в том, что все активные компоненты Spring Integration реализуют SmartLifecycle, и они stop() изящно управляются во время соответствующей фазы закрытия контекста приложения.

person Artem Bilan    schedule 06.12.2018
comment
Я добавил полное определение потока в разделе: pastebin.com/KDj8ubHE Как видите, один поток разделен на более мелкие частей, использующих несколько IntegrationFlow, соединенных DirectChannels. Само выключение, как описано выше, представляет собой ApplicationListener в ContextClosedEvent, который делегирует выключение конкретному компоненту выключения для каждого потока. В случае этого конкретного OutboundFlow завершение работы реализовано, чтобы дождаться завершения текущей передачи и заблокировать сообщения, ожидающие в текущем опросе. - person Matthias Hanske; 07.12.2018
comment
Все в вашем коде выглядит хорошо. Как я уже сказал: вам не нужна никакая настраиваемая логика уничтожения, если вы просто правильно полагаетесь на закрытие контекста приложения. Сначала будет остановлен Files.inboundAdapter() и только все остальные пассивные компоненты обработки сообщений. Было бы здорово получить от вас какой-нибудь минимальный проект Spring Boot, раскрывающий эту проблему. И, конечно же, без Докера. Это сбивает с толку и не похоже на то, что связано ... - person Artem Bilan; 07.12.2018