Назначение запроса исходящего шлюза JMS — последующая обработка в случае успеха

Я использую JMS Outbound Gateway для отправки сообщений в очередь запросов и получения сообщений из отдельной очереди ответов. Я хотел бы добавить функциональность, чтобы вызов определенного метода bean-компонента выполнялся после того, как сообщение было успешно отправлено в очередь запросов.

Для этого я использую spring-integration 4.0.4 API и spring-integration-java-dsl 1.0.0 API, и до сих пор мне удавалось реализовать вышеуказанную функциональность следующим образом:

@Configuration
@EnableIntegration
public class IntegrationConfig {

    ...

    @Bean
    public IntegrationFlow requestFlow() {

        return IntegrationFlows
            .from("request.ch")
            .routeToRecipients(r ->
                r.ignoreSendFailures(false)
                 .recipient("request.ch.1", "true")
                 .recipient("request.ch.2", "true"))
            .get();
    }

    @Bean
    public IntegrationFlow sendReceiveFlow() {

        return IntegrationFlows
            .from("request.ch.1")
            .handle(Jms.outboundGateway(cachingConnectionFactory)
                    .receiveTimeout(45000)
                    .requestDestination("REQUEST_QUEUE")
                    .replyDestination("RESPONSE_QUEUE")
                    .correlationKey("JMSCorrelationID"), e -> e.requiresReply(true))
                    .channel("response.ch").get();
    }

    @Bean
    public IntegrationFlow postSendFlow() {

        return IntegrationFlows
            .from("request.ch.2")
            .handle("requestSentService", "fireRequestSuccessfullySentEvent")
            .get();
    }

    ...
}

Теперь, хотя приведенная выше конфигурация работает, я заметил, что единственная очевидная причина, по которой request.ch.1 вызывается раньше, чем request.ch.2, связана с алфавитным порядком имен каналов, а не с порядком, в котором они были добавлены к самому RecipientListRouter. Это верно? Или я что-то здесь упускаю?

* В ИЗМЕНЕНИИ ниже показано решение с использованием агрегатора между исходящими/входящими адаптерами JMS (без шлюза обмена сообщениями) *

Конфигурация интеграции:

@Configuration
@EnableIntegration
public class IntegrationConfig { 

    ...

    @Bean
    public IntegrationFlow reqFlow() {

        return IntegrationFlows
            .from("request.ch")
            .enrichHeaders(e -> e.headerChannelsToString())
            .enrichHeaders(e -> e.headerExpression(IntegrationMessageHeaderAccessor.CORRELATION_ID, "headers['" + MessageHeaders.REPLY_CHANNEL + "']"))             
            .routeToRecipients(r -> {
                r.ignoreSendFailures(false);
                r.recipient("jms.req.ch", "true");
                r.recipient("jms.agg.ch", "true");
            })
            .get();
    }

    @Bean
    public IntegrationFlow jmsReqFlow() {

        return IntegrationFlows
            .from("jms.req.ch")
            .handle(Jms.outboundAdapter(cachingConnectionFactory)
                    .destination("TEST_REQUEST_CH")).get();
    }

    @Bean
    public IntegrationFlow jmsPostReqFlow() {

        return IntegrationFlows
            .from("jms.req.ch")
            .handle("postSendService", "postSendProcess")
            .get();
    }

    @Bean
    public IntegrationFlow jmsResFlow() {

        return IntegrationFlows
            .from(Jms.inboundAdapter(cachingConnectionFactory).destination(
                    "TEST_RESPONSE_CH"),
                    c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
            .channel("jms.agg.ch").get();
    }

    @Bean
    public IntegrationFlow jmsAggFlow() {

        return IntegrationFlows
            .from("jms.agg.ch")
            .aggregate(a -> { 
                a.outputProcessor(g -> {
                    List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());

                    Message<?> firstMessage = l.get(0);
                    Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;

                    Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
                            .setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
                            .build();

                     return messageOut;
                }); 
                a.releaseStrategy(g -> g.size() == 2);
                a.groupTimeout(45000);
                a.sendPartialResultOnExpiry(false);
                a.discardChannel("jms.agg.timeout.ch");
            }, null)
            .channel("response.ch")
            .get();
        }
    }

    @Bean
    public IntegrationFlow jmsAggTimeoutFlow() {
        return IntegrationFlows
            .from("jms.agg.timeout.ch")
            .handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h))
            .channel("error.ch")
            .get();
    }
}

Привет, ПМ


person Going Bananas    schedule 10.12.2014    source источник


Ответы (1)


Х-м... Похоже. Это действительно ошибка в логике DslRecipientListRouter: https://github.com/spring-projects/spring-integration-java-dsl/issues/9 Скоро будет исправлено и выпущено через пару дней.

Спасибо, что указали на это!

КСТАТИ. ваша логика немного неверна: даже когда мы исправим это RecipientListRouter, второй получатель получит то же сообщение запроса только после того, как JmsOutboundGateway получит reply, а не только после того, как запрос будет отправлен в очередь запросов. Заблокирован процесс запроса-ответа. И нет хука, чтобы получить точку между запросом и ответом в JmsOutboundGateway.

Это тебя устраивает?

person Artem Bilan    schedule 10.12.2014
comment
Я дам выпуск с исправлением, когда он будет доступен. Что касается вашего упоминания о логике: когда я впервые реализовал эту функцию, у меня была конфигурация, в которой request.ch был каналом pub-sub, и только sendReceiveFlow() и postSendFlow() питались от этого канала pub-sub. С этой конфигурацией у меня было поведение, которое вы описываете, но с конфигурацией, показанной в этом сообщении, кажется, что postSendFlow() вызывается только тогда, когда сообщение было успешно отправлено в очередь запросов и до того, как сообщение получено из очереди ответов. Завтра проведу тест и проверю... - person Going Bananas; 10.12.2014
comment
Наряду с исправлением recipinetList order были добавлены другие улучшения, такие как .recipient("request.ch.1") и .recipient(myChannel()). Другой особенностью является requiresReply = true по умолчанию для тех outboundGateway, которые аналогичны в конфигурации XML. Итак, с этим исправлением ваш код можно немного упростить. - person Artem Bilan; 11.12.2014
comment
Наконец-то у меня появилась возможность провести тест, и да, postSendFlow() определенно не вызывается, как я думал. Есть ли альтернативы, чтобы сделать эту работу? Если бы я, например, использовал JmsOutboundAdapter и JmsInboundAdapter` вместо JMS Outbound Gateway, мог бы я по-прежнему применять 45-секундное правило ожидания ответа для ответных сообщений? - person Going Bananas; 12.12.2014
comment
Как я уже сказал: любой исходящий шлюз — это черный ящик, и мы не можем подключиться между запросом и ответом. Если вы будете использовать пару Inboud- и OutboundAdapters, вам следует выполнить корреляцию вручную и... использовать агрегатор за шлюзом для достижения reply-timeout. - person Artem Bilan; 12.12.2014
comment
Здравствуйте @Artem, извините за задержку с ответом. Отвлекся на другую работу + каникулы и вернулся к этому только сейчас! Во всяком случае, я рассмотрел использование агрегатора, как вы предложили, и не мог понять (с небольшим чтением), как его применять в этой ситуации. Однако я пробую другой подход (о котором, я надеюсь, вы можете сказать мне, разумно ли это?). Как насчет того, чтобы настроить пару исходящих/входящих адаптеров JMS за простым шлюзом обмена сообщениями? Таким образом, я могу установить время ожидания ответа на шлюзе до того, как JMS отправит/получит части? Я отредактирую ветку основного вопроса, чтобы показать это. - person Going Bananas; 07.01.2015
comment
Без агрегатора запросов и ответов в виде группы для выпуска вы не сможете достичь корреляции шлюза TemporaryReplyChannel. Вы просто отправляете запрос адаптеру outbound и агрегатору. Адаптер inbound должен отправить свой результат агрегатору с тем же correlationKey, чтобы сгруппировать результат и вернуть его в ответ для шлюза. - person Artem Bilan; 07.01.2015
comment
О, я только что отредактировал основной пост, чтобы показать адаптеры JMS, стоящие за простым подходом к шлюзу. Но вы говорите, что этот подход все равно не сработает? - person Going Bananas; 07.01.2015
comment
Да, это правда. Поскольку ответ шлюза зависит от TemporaryReplyChannel и блокировки для вызывающего потока. Есть <header-channels-to-string>, конечно, но я не уверен, что вы сможете перевезти его в JMS и обратно. Однако вы можете попробовать... В противном случае вы должны иметь дело с агрегатором для пары запрос/ответ и опубликовать-подписаться на запрос для отправки в JMS и в агрегатор. - person Artem Bilan; 07.01.2015
comment
Я провел несколько тестовых сценариев обмена сообщениями с использованием JMS-адаптеров за простым подходом Messaging Gateway, и до сих пор все тесты были пройдены. Мои тесты включали: 1) отправить один запрос с ожиданием одного ответа, 2) отправить один запрос с ожиданием ответа НЕТ (что привело к тайм-ауту шлюза через 45 секунд), 3) отправить несколько запросов с ожиданием нескольких ответов, 4) отправить несколько запросов с ожиданием НЕТ ответы обратно (что приводит к тайм-ауту шлюза через 45 секунд для каждого переданного запроса). Все тесты пройдены. Чтобы уточнить, несколько запросов проходят через отдельные потоки (с использованием пула исполнителей xN Thread). - person Going Bananas; 08.01.2015
comment
Это правда, потому что вы используете e.headerChannelsToString(), и он делает то же самое для этих TemporaryReplyChannel для каждого потока. И похоже, что ваш материал JSM правильно передает эти заголовки по запросу/ответу. Значит ли это, что мы исправили ваши требования? - person Artem Bilan; 08.01.2015
comment
Исходя из этого, я теперь не уверен, какой подход, наконец, выбрать. Агрегаторный подход или JMS-адаптер(ы) на базе Messaging Gateway! Причина, по которой я хочу убедиться, что все сделано правильно, заключается в том, что мы находимся в процессе проверки общих шаблонов EIP, которые мы будем использовать для всех и всех потоков EIP на нашем предприятии, и мы хотим: a) убедиться, что потоки ведут себя как мы ожидаем, и б) потоки настолько очевидны, насколько это возможно для простоты последующего документирования (это причина, по которой я пытаюсь избежать подхода агрегатора в этой области поведения потока EIP). - person Going Bananas; 08.01.2015
comment
Мои тесты, кажется, показывают, что это работает, поэтому я бы сказал, что да, требование было выполнено! Если вы не думаете, что могут быть другие тестовые сценарии, которые я пропустил, которые я также должен проверить? - person Going Bananas; 08.01.2015
comment
Это не сработает, если другая сторона удалит требуемый заголовок replyChannel. В этом случае он будет рассматриваться как Gateway to time out after 45secs. Вот почему aggregator решение для сопоставления запроса и ответа для пары адаптеров является более надежным. - person Artem Bilan; 08.01.2015
comment
Я понимаю, что вы сейчас говорите, да. У нас нет никаких гарантий, что заголовок сообщения replyChannel будет сохранен за пределами нашего домена, понятно. Я попробую использовать подход Aggregator и повторно запущу тестовые сценарии. Спасибо еще раз! - person Going Bananas; 08.01.2015
comment
Здравствуйте, теперь я реализовал решение, используя Aggregator, как предложил @Artem. См. основное РЕДАКТИРОВАТЬ. Обратите внимание, что я полностью устранил необходимость в Messaging Gateway, и вместо этого сам Aggregator теперь заботится о требовании тайм-аута в 45 секунд (таким образом, Aggregator может очищать частичные группы при возникновении тайм-аутов). Все наши функциональные тесты проходят нормально с этим решением, но я не против узнать, является ли решение Aggregator, показанное в основном EDIT, настолько хорошим, насколько это возможно, или есть ли какие-либо улучшения, которые можно было бы внести в него? - person Going Bananas; 12.01.2015
comment
Да, есть много мест, которые можно улучшить, но если над этим работать, я бы сказал, давайте продолжим бизнес-логику! Прямо сейчас я хотел бы увидеть одобрение моего ответа - person Artem Bilan; 12.01.2015
comment
Вот комментарии к вашему решению: gist.github.com/artembilan/9ffba1bb0b18842e27cf - person Artem Bilan; 12.01.2015
comment
Спасибо за предложенные улучшения. Теперь я отредактировал решение на основе агрегатора, чтобы отразить это. - person Going Bananas; 12.01.2015