Как вручную зафиксировать смещения в исходном модуле Kafka при получении подтверждения от модуля приемника Kafka в Spring XD?

В потоке XD сообщения потребляются из темы Kafka через исходный модуль, а затем отправляются в принимающий модуль Kafka. Причина разработки пользовательских модулей источника и приемника Kafka заключается в том, что я хочу обновлять смещения из исходного модуля только тогда, когда я получаю подтверждение от нижестоящего модуля приемника при успешно отправленных сообщениях.

Я использую Spring Integration Kafka 2.0.1.RELEASE и Spring Kafka 1.0.3.RELEASE с темами в среде Kafka 0.10.0.0. Я пробовал следующее:

Конфигурация исходного модуля:

@Configuration
public class ModuleConfiguration {

    @Value("${topic}")
    private String topic;

    @Value("${brokerList}")
    private String brokerAddress;

    @Bean
    public SubscribableChannel output() {
        DirectChannel output = new DirectChannel();
        return output;
    }

    @Autowired
    TopicPartitionInitialOffset topicPartition;

    @Bean
    public TopicPartitionInitialOffset topicPartition(){
        return new TopicPartitionInitialOffset(this.topic, 0, (long) 0);    
    }

    @Bean
    public KafkaMessageListenerContainer<String, String> container() throws Exception {
        ContainerProperties containerProps = new ContainerProperties(topicPartition);
        containerProps.setAckMode(AckMode.MANUAL);
        KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer = new KafkaMessageListenerContainer<>(consumerFactory(),containerProps);
        return kafkaMessageListenerContainer;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        DefaultKafkaConsumerFactory<String,String> consumerFactory =  new DefaultKafkaConsumerFactory<>(props);
        return consumerFactory;
    }
}

Исходный модуль: InboundKafkaMessageDrivenAdapter

@MessageEndpoint
@Import(ModuleConfiguration.class)
public class InboundKafkaMessageDrivenAdapter {

    @Autowired
    KafkaMessageListenerContainer<String, String> container;

    @Autowired
    SubscribableChannel output;

    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(output);
        return kafkaMessageDrivenChannelAdapter;
    }
}

Модуль приемника: конфигурация

@Configuration
@EnableIntegration
public class ModuleConfiguration {

    @Value("${topic}")
    private String topic;

    @Value("${brokerList}")
    private String brokerAddress;

    @Bean
    public KafkaProducerMessageHandler<String,String> handler() throws Exception {
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
        handler.setTopicExpression(new LiteralExpression(this.topic));
        return handler;
    }

    @Bean
    public SubscribableChannel input() {
        return new DirectChannel();
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
}

Модуль приемника: SinkActivator

@Import(ModuleConfiguration.class)
@MessageEndpoint
public class SinkActivator {

    @Autowired
    KafkaProducerMessageHandler<String,String> handler;

    @Autowired
    SubscribableChannel input;

    @ServiceActivator(inputChannel = "input")
    public void sendMessage(Message<?> msg) throws Exception{
            Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            handler.handleMessage(msg);
            acknowledgment.acknowledge();
            }
}

Источник успешно получает сообщения и отправляет их в приемник, однако, когда я пытаюсь получить подтверждение в приемнике:

Подтверждение подтверждения = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

Выдается следующее исключение:

Причина: java.lang.IllegalArgumentException: для заголовка 'kafka_acknowledgment' указан неверный тип. Ожидается [interface org.springframework.kafka.support.Acknowledgment], но фактический тип — [class org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment]

В исходном коде для spring-integration-kafka-2.0.1.RELEASE класс KafkaMessageListenerContainer, когда AckMode=MANUAL, к сообщению добавляется заголовок kafka_acknowledgment, однако тип является внутренним статическим классом ConsumerAcknowldgment.

Итак, как мне получить подтверждение от модуля приемника для сообщения, отправленного из источника?


person Logan    schedule 19.08.2016    source источник


Ответы (1)


Если вы не используете локальный транспорт, вы не можете этого сделать, Acknowledgment является "живым" объектом и не может быть отправлен по сети другому модулю.

Если вы используете локальный транспорт, он будет работать, но у вас будут проблемы с загрузчиком классов, потому что каждый модуль работает в своем собственном загрузчике классов, а интерфейсы Acknowledgment являются разностными экземплярами класса.

Вам нужно будет переместить spring-integration-kafka и spring-kafka в папку xd/lib, чтобы классы загружались из общего загрузчика классов.

person Gary Russell    schedule 19.08.2016
comment
Быстрый вопрос: мы используем xd с транспортом Redis {по умолчанию}, поэтому наши модули, развернутые на xd, взаимодействуют через очереди Redis. Будет ли это считаться местным транспортом? Я так думаю, но просто хотел подтвердить с вами. - person Logan; 19.08.2016
comment
Нет; local означает, что приемник работает в том же потоке, что и источник — вы не можете передавать живой объект Acknowledgment через Redis. Подтверждение имеет состояние, связанное с потребителем источника, поэтому вы не можете удаленно подтверждать подобное. И локальный работает только с одним узлом. - person Gary Russell; 19.08.2016
comment
Еще один дополнительный вопрос: Итак, в нашем потоке есть ли у вас какие-либо предложения по шаблонам архитектуры, где наш поток в XD исходный модуль Kafka будет подтверждать только тогда, когда сообщение успешно отправлено из модуля приемника. Это не подлежащее обсуждению решение, данное нам для всех потоков на нашей платформе XD. Мы будем очень признательны за предложение или обходной путь для решения. Спасибо еще раз. - person Logan; 19.08.2016
comment
Готового решения действительно не существует; вам нужно будет предоставить какое-то скрытое подтверждение обратно к источнику (возможно, через другой список повторных обращений). Источник должен будет сохранить ссылку на объект подтверждения и подтвердить его, когда придет подтверждение. Добро пожаловать в микросервисы :) Вы также должны быть осторожны с многопоточностью — вы можете подтвердить более позднее сообщение, в то время как более раннее сообщение не удалось, и более позднее смещение эффективно подтвердит ошибочное сообщение. - person Gary Russell; 19.08.2016
comment
Спасибо за ваш ответ! В соответствии с вашим предложением перейти на микросервисы, не могли бы вы намекнуть, как мы будем использовать поток данных Spring Cloud, Cloud Foundry или другие функции Spring Cloud для получения потока, источник которого подтверждает только успешную обработку сообщения приемника? Мы рассмотрели некоторые основы, но у нас нет опыта реализации, поэтому нам сложно визуализировать решение, поэтому любой дизайн/рекомендации по архитектуре высоко ценится. Я уверен, что это дело будет важным для многих предприятий. Спасибо за вашу помощь. - person Logan; 22.08.2016
comment
Я бы так не поступил. Я бы использовал RabbitMQ вместо Redis для транспорта; таким образом, вы можете быть уверены, что сообщение защищено кроликом, поэтому в это время можно обновить смещение в kafka. Если вы должны сделать это таким образом, рассмотрите возможность перехода с XD на spring-cloud-stream и используйте Агрегированное приложение, чтобы приемник выполнялся в исходном потоке. spring-cloud-stream/spring-cloud-dataflow — это проекты нового поколения для XD. - person Gary Russell; 22.08.2016
comment
Попытка сохранить состояние и обратную связь от приемника чревата трудностями. - person Gary Russell; 22.08.2016
comment
Если ваша рекомендация состоит в том, чтобы не полагаться на подтверждение приемника для обновления смещений kafka, как бы вы предложили решить проблему, когда у нас есть поток, который потребляет из темы kafka и после некоторой обработки отправляет в другую тему kafka и гарантирует, что обработка служба (остальный клиент в нашем случае) выходит из строя, мы приостанавливаем поток, и поток автоматически возвращается в онлайн, и нет потери данных? В основном автоматизированный механизм доставки не более одного раза, который является надежным. - person Logan; 22.08.2016