У меня есть потоки интеграции Spring, которые мне нужно снова использовать.
@Bean
public IntegrationFlow sendToKafkaFlowRequest(@Value("${kafka.document-consume-topic}") String topic,
ProducerFactory<?, Message> producerFactory) {
return IntegrationFlows.from("kafkaRequestChannel")
.handle(Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.topicExpression("headers[kafka_topic] ?: '" + topic + "'"))
.get();
}
@Bean
public IntegrationFlow listeningFromKafkaFlow(@Value("${kafka.document-consume-topic}") String topic,
ConsumerFactory<?, Message> consumerFactory) {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory, ListenerMode.record, topic)
.configureListenerContainer(c -> c.ackMode(AbstractMessageListenerContainer.AckMode.RECORD))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.channel("interMessageChannel")
.get();
}
Я хотел снова и снова использовать эти два потока для нескольких тем. Но проблема в том, что тема жестко запрограммирована. Вопрос в том, можем ли мы использовать заголовки сообщения, чтобы поместить в него название темы? Будет ли это проблемой ?