Я использую интеграцию Spring 5.0.6. Я прошел через этот документ и создал следующий код, который прослушивает конечную точку HTTP и публикует в теме kafka.
Все работает нормально, и я также получаю сообщение в теме. Но на HTTP-клиенте ответ не отправлен, он дает «Ответ не получен в течение тайм-аута».
Как я могу отправить ответ вызывающей стороне http в приведенном ниже коде:
@Bean
public DirectChannel replyChannel() {
return new DirectChannel();
}
@Bean(name = "restInputFlow")
public IntegrationFlow send() {
return IntegrationFlows
.from(Http.inboundGateway("/push").requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class).replyChannel(replyChannel()))
.transform(new Transformer())
.handle(kafkaMessageHandler(producerFactory(), getKafkaSourceTopic()))
.enrichHeaders(
c -> c.header(org.springframework.integration.http.HttpHeaders.STATUS_CODE, HttpStatus.CREATED))
.get();
}
private KafkaProducerMessageHandlerSpec<GenericRecord, GenericRecord, ?> kafkaMessageHandler(
ProducerFactory<GenericRecord, GenericRecord> producerFactory, String topic) {
return Kafka.outboundChannelAdapter(producerFactory)
.messageKey("key").headerMapper(mapper())
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
Спасибо за любую помощь.