В потоке XD сообщения потребляются из темы Kafka через исходный модуль, а затем отправляются в принимающий модуль Kafka. Причина разработки пользовательских модулей источника и приемника Kafka заключается в том, что я хочу обновлять смещения из исходного модуля только тогда, когда я получаю подтверждение от нижестоящего модуля приемника при успешно отправленных сообщениях.
Я использую Spring Integration Kafka 2.0.1.RELEASE и Spring Kafka 1.0.3.RELEASE с темами в среде Kafka 0.10.0.0. Я пробовал следующее:
Конфигурация исходного модуля:
@Configuration
public class ModuleConfiguration {
@Value("${topic}")
private String topic;
@Value("${brokerList}")
private String brokerAddress;
@Bean
public SubscribableChannel output() {
DirectChannel output = new DirectChannel();
return output;
}
@Autowired
TopicPartitionInitialOffset topicPartition;
@Bean
public TopicPartitionInitialOffset topicPartition(){
return new TopicPartitionInitialOffset(this.topic, 0, (long) 0);
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties containerProps = new ContainerProperties(topicPartition);
containerProps.setAckMode(AckMode.MANUAL);
KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(),containerProps);
return kafkaMessageListenerContainer;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory<String,String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return consumerFactory;
}
}
Исходный модуль: InboundKafkaMessageDrivenAdapter
@MessageEndpoint
@Import(ModuleConfiguration.class)
public class InboundKafkaMessageDrivenAdapter {
@Autowired
KafkaMessageListenerContainer<String, String> container;
@Autowired
SubscribableChannel output;
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(output);
return kafkaMessageDrivenChannelAdapter;
}
}
Модуль приемника: конфигурация
@Configuration
@EnableIntegration
public class ModuleConfiguration {
@Value("${topic}")
private String topic;
@Value("${brokerList}")
private String brokerAddress;
@Bean
public KafkaProducerMessageHandler<String,String> handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression(this.topic));
return handler;
}
@Bean
public SubscribableChannel input() {
return new DirectChannel();
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}
Модуль приемника: SinkActivator
@Import(ModuleConfiguration.class)
@MessageEndpoint
public class SinkActivator {
@Autowired
KafkaProducerMessageHandler<String,String> handler;
@Autowired
SubscribableChannel input;
@ServiceActivator(inputChannel = "input")
public void sendMessage(Message<?> msg) throws Exception{
Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
handler.handleMessage(msg);
acknowledgment.acknowledge();
}
}
Источник успешно получает сообщения и отправляет их в приемник, однако, когда я пытаюсь получить подтверждение в приемнике:
Подтверждение подтверждения = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
Выдается следующее исключение:
Причина: java.lang.IllegalArgumentException: для заголовка 'kafka_acknowledgment' указан неверный тип. Ожидается [interface org.springframework.kafka.support.Acknowledgment], но фактический тип — [class org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment]
В исходном коде для spring-integration-kafka-2.0.1.RELEASE класс KafkaMessageListenerContainer, когда AckMode=MANUAL, к сообщению добавляется заголовок kafka_acknowledgment, однако тип является внутренним статическим классом ConsumerAcknowldgment.
Итак, как мне получить подтверждение от модуля приемника для сообщения, отправленного из источника?