Spring Integration Java DSL HTTP ответ не получен из-за ошибки тайм-аута

Я использую интеграцию Spring 5.0.6. Я прошел через этот документ и создал следующий код, который прослушивает конечную точку HTTP и публикует в теме kafka.

Все работает нормально, и я также получаю сообщение в теме. Но на HTTP-клиенте ответ не отправлен, он дает «Ответ не получен в течение тайм-аута».

Как я могу отправить ответ вызывающей стороне http в приведенном ниже коде:

@Bean
public DirectChannel replyChannel() {
    return new DirectChannel();
}

@Bean(name = "restInputFlow")
public IntegrationFlow send() {
    return IntegrationFlows
            .from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
                    .requestPayloadType(String.class).replyChannel(replyChannel()))
            .transform(new Transformer())
            .handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
            .enrichHeaders(
                    c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
            .get();
}

private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
            ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {

        return Kafka.outboundChannelAdapter(producerFactory)
                .messageKey("key").headerMapper(mapper())
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
    }

Спасибо за любую помощь.


person kleash    schedule 20.07.2018    source источник


Ответы (1)


Ваша проблема в том, что вы используете односторонний Kafka.outboundChannelAdapter(producerFactory). Это просто для "отправить и забыть".

Если вы заинтересованы в создании какого-либо последующего процесса или вам просто нужно ответить на HTTP-запрос, вам следует рассмотреть возможность использования:

/**
 * The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
 * method specific implementation to allow the use of the 'subflow' subscriber capability.
 * @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
 * {@link PublishSubscribeSpec} options including 'subflow' definition.
 * @return the current {@link IntegrationFlowDefinition}.
 */
public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {

В определении потока, где вашим первым подписчиком действительно будет этот Kafka.outboundChannelAdapter(producerFactory), а вторым может быть упомянутый .enrichHeaders(). Если вы больше ничего не сделаете, этот последний отправит свой результат в заголовок replyChannel и, следовательно, получит ответ HTTP.

В этом сценарии публикации-подписки вы должны иметь в виду, что payload для второго подписчика будет таким же, как вы пытаетесь отправить Kafka.

person Artem Bilan    schedule 20.07.2018
comment
Спасибо за быстрый ответ Артем. После преобразования () я заменил код на этот: .publishSubscribeChannel(c -> c .subscribe(sf -> sf.handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))) .subscribe(sf -> sf.enrichHeaders(msg -> msg.header( org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED)))) .get(); Но теперь он показывает исключение: нет доступных заголовков output-channel или replyChannel. - person kleash; 20.07.2018
comment
Я думаю, что у вас есть проблема в файле .transform(new Transformer()). Если вы создаете там свой собственный Message, вы также должны скопировать заголовки запросов. - person Artem Bilan; 20.07.2018