Конфигурация динамического имени темы для Spring Kafka при использовании адаптеров Kafka для интеграции Spring?

У меня есть потоки интеграции Spring, которые мне нужно снова использовать.

@Bean
public IntegrationFlow sendToKafkaFlowRequest(@Value("${kafka.document-consume-topic}") String topic,
                                              ProducerFactory<?, Message> producerFactory) {
    return IntegrationFlows.from("kafkaRequestChannel")
            .handle(Kafka
                    .outboundChannelAdapter(producerFactory)
                    .messageKey(m -> m
                            .getHeaders()
                            .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                    .topicExpression("headers[kafka_topic] ?: '" + topic + "'"))
            .get();
}


@Bean
public IntegrationFlow listeningFromKafkaFlow(@Value("${kafka.document-consume-topic}") String topic,
                                              ConsumerFactory<?, Message> consumerFactory) {
    return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory, ListenerMode.record, topic)
            .configureListenerContainer(c -> c.ackMode(AbstractMessageListenerContainer.AckMode.RECORD))
            .retryTemplate(new RetryTemplate())
            .filterInRetry(true))
            .channel("interMessageChannel")
            .get();
}

Я хотел снова и снова использовать эти два потока для нескольких тем. Но проблема в том, что тема жестко запрограммирована. Вопрос в том, можем ли мы использовать заголовки сообщения, чтобы поместить в него название темы? Будет ли это проблемой ?


person Yashdeep Sharma    schedule 29.09.2017    source источник


Ответы (1)


OK. Послушайте, Kafka.messageDrivenChannelAdapter() может принять несколько тем, поэтому вам понадобится только один поток по этому вопросу.

Правильно, вы всегда можете установить заголовок kafka_topic в сообщение перед отправкой в ​​Kafka через Kafka.outboundChannelAdapter(). И снова: здесь тоже не нужно дублировать IntegrationFlow. Единственное, что нужно отправить в тему, разрешенную против сообщения, вполне достаточно.

person Artem Bilan    schedule 29.09.2017
comment
А как насчет listenFromKafkaFlow (), как не дублировать его, как я могу использовать сообщение, не дублируя один и тот же поток? - person Yashdeep Sharma; 29.09.2017
comment
Я же сказал вам: Kafka.messageDrivenChannelAdapter() может принимать vararg для топиков. Вы не можете просто вставить все свои темы в определение listeningFromKafkaFlow bean-компонента и делегировать их Kafka.messageDrivenChannelAdapter() - person Artem Bilan; 29.09.2017