Spring Integration DSL отвечает успешно, если успешная доставка в очередь

Я пытаюсь открыть конечную точку HTTP, которая отправит сообщение в очередь JMS. Я хочу ответить Успехом, если доставка прошла успешно, и ОШИБКОЙ, если сообщение не может быть доставлено.

 @Bean
    public IntegrationFlow systemTaskCall(MapToServiceTaskConfigTransformer mapTransformer, CachingConnectionFactory jmsConnectionFactory) {
        return IntegrationFlows.from(
                Http.inboundGateway("/spartaSystemTask")
                        .requestMapping(r -> r
                                .methods(HttpMethod.POST)
                                .consumes("application/json")
                        )
                        .requestPayloadType(Map.class)
                        .replyChannel(RESPONSE_CHANNEL)
                        .errorChannel("errorChannel")
        )
                .handle((payload, headers) -> mapTransformer.transform((Map<String, String>) payload))
                .enrichHeaders(Collections.singletonMap(DESTINATION_QUEUE, "request.queue"))
                .enrichHeaders(Collections.singletonMap(JMS_REPLY_TO, "response.queue"))
                .transform(Transformers.toJson())
                .handle(
                        Jms.outboundGateway(jmsConnectionFactory,)
                            .requestDestination(message -> message.getHeaders().get(DESTINATION_QUEUE))

                )
                .log(LoggingHandler.Level.ERROR)
                .enrichHeaders(
                        c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED)
                )
                .transform(source -> "SUCCESS")
                .transform(Transformers.toJson())
                .channel(RESPONSE_CHANNEL)
                .get();
    }

 @Bean
    public IntegrationFlow errorFlow(){
        return IntegrationFlows.from("errorChannel")
                .transform(source -> "error")
                .transform(Transformers.toJson())
                .channel(RESPONSE_CHANNEL)
                .get();
    }

Когда я вызываю этот URL-адрес, сообщение удаляется, но время ожидания HTTP-вызова истекает. Кажется, остальная часть кода вызова исходящего шлюза JMS не выполняется.

В случае сбоя доставки сообщения я получаю правильный ответ.


person Prachit Raorane    schedule 27.01.2020    source источник


Ответы (1)


У вас есть эта конфигурация:

.handle(
                    Jms.outboundGateway(jmsConnectionFactory,)
                        .requestDestination(message -> message.getHeaders().get(DESTINATION_QUEUE))

            )
  • outboundGateway. Это означает, что вы отправляете запрос и ожидаете ответа от другой стороны, но похоже, что вы только отправляете сообщение JMS в очередь, и никто на другой стороне слушателя не отвечает вам чем-то в этом response.queue. Это причина того, что вы не можете получить обычную публикацию JMS.

Вы должны убедиться, что логика в вашем потоке верна, и в вашем распределенном решении действительно допустимо, что вы ожидаете какой-то ответ со стороны сервера.

В противном случае вам нужно подумать об изменении вашей логики на Jms.outboundAdapter(), который действительно является односторонним отправителем. Для ответа HTTP вы можете использовать publishSubscribeChannel() с этим Jms.outboundAdapter() в качестве первого подписчика, а остальную часть вашего потока - в качестве второго. Таким образом, второй подписчик не будет вызван до тех пор, пока первый не закончит свою логику должным образом. В случае ошибки вы можете обернуть Jms.outboundAdapter() ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/html/messaging-endpoints.html#цепочкасоветов-обработчиковсообщений

person Artem Bilan    schedule 27.01.2020