Я использую JMS Outbound Gateway
для отправки сообщений в очередь запросов и получения сообщений из отдельной очереди ответов. Я хотел бы добавить функциональность, чтобы вызов определенного метода bean-компонента выполнялся после того, как сообщение было успешно отправлено в очередь запросов.
Для этого я использую spring-integration 4.0.4
API и spring-integration-java-dsl 1.0.0
API, и до сих пор мне удавалось реализовать вышеуказанную функциональность следующим образом:
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow requestFlow() {
return IntegrationFlows
.from("request.ch")
.routeToRecipients(r ->
r.ignoreSendFailures(false)
.recipient("request.ch.1", "true")
.recipient("request.ch.2", "true"))
.get();
}
@Bean
public IntegrationFlow sendReceiveFlow() {
return IntegrationFlows
.from("request.ch.1")
.handle(Jms.outboundGateway(cachingConnectionFactory)
.receiveTimeout(45000)
.requestDestination("REQUEST_QUEUE")
.replyDestination("RESPONSE_QUEUE")
.correlationKey("JMSCorrelationID"), e -> e.requiresReply(true))
.channel("response.ch").get();
}
@Bean
public IntegrationFlow postSendFlow() {
return IntegrationFlows
.from("request.ch.2")
.handle("requestSentService", "fireRequestSuccessfullySentEvent")
.get();
}
...
}
Теперь, хотя приведенная выше конфигурация работает, я заметил, что единственная очевидная причина, по которой request.ch.1
вызывается раньше, чем request.ch.2
, связана с алфавитным порядком имен каналов, а не с порядком, в котором они были добавлены к самому RecipientListRouter
. Это верно? Или я что-то здесь упускаю?
* В ИЗМЕНЕНИИ ниже показано решение с использованием агрегатора между исходящими/входящими адаптерами JMS (без шлюза обмена сообщениями) *
Конфигурация интеграции:
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow reqFlow() {
return IntegrationFlows
.from("request.ch")
.enrichHeaders(e -> e.headerChannelsToString())
.enrichHeaders(e -> e.headerExpression(IntegrationMessageHeaderAccessor.CORRELATION_ID, "headers['" + MessageHeaders.REPLY_CHANNEL + "']"))
.routeToRecipients(r -> {
r.ignoreSendFailures(false);
r.recipient("jms.req.ch", "true");
r.recipient("jms.agg.ch", "true");
})
.get();
}
@Bean
public IntegrationFlow jmsReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle(Jms.outboundAdapter(cachingConnectionFactory)
.destination("TEST_REQUEST_CH")).get();
}
@Bean
public IntegrationFlow jmsPostReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle("postSendService", "postSendProcess")
.get();
}
@Bean
public IntegrationFlow jmsResFlow() {
return IntegrationFlows
.from(Jms.inboundAdapter(cachingConnectionFactory).destination(
"TEST_RESPONSE_CH"),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
.channel("jms.agg.ch").get();
}
@Bean
public IntegrationFlow jmsAggFlow() {
return IntegrationFlows
.from("jms.agg.ch")
.aggregate(a -> {
a.outputProcessor(g -> {
List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());
Message<?> firstMessage = l.get(0);
Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;
Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
.build();
return messageOut;
});
a.releaseStrategy(g -> g.size() == 2);
a.groupTimeout(45000);
a.sendPartialResultOnExpiry(false);
a.discardChannel("jms.agg.timeout.ch");
}, null)
.channel("response.ch")
.get();
}
}
@Bean
public IntegrationFlow jmsAggTimeoutFlow() {
return IntegrationFlows
.from("jms.agg.timeout.ch")
.handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h))
.channel("error.ch")
.get();
}
}
Привет, ПМ