Фиксация смещения Kafka-Streams - повторная обработка очень старых сообщений

Мы используем apache kafka-streams 0.10.2.0 в приложении. Мы используем топологию kafka-streams для передачи обработанных данных в следующую тему до конца обработки.

Кроме того, мы используем контейнер AWS ECS для развертывания потребительского приложения. Мы заметили, что потребитель выбирает для обработки очень старые сообщения, хотя они были обработаны раньше. Эта проблема возникает случайным образом во время увеличения или уменьшения масштаба службы или в случае новых развертываний. Я понимаю, что во время ребалансировки потребителей некоторые сообщения могут быть повторно обработаны. Но в данном случае это повторная обработка большого количества сообщений, которые были успешно обработаны долгое время (более 10 дней назад).

Мы не можем понять основную причину этой проблемы. Разве это не правильная фиксация смещений и сбор случайных сообщений в разной топологии. Это приводит к несогласованному поведению одного сообщения, повторно обрабатываемого в любой топологии.

Удивительно, но мы также не видим никаких исключений в отношении потребителя. Пожалуйста, помогите.

Вот конфигурации, которые мы используем:

    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"UniqueKey");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,key);
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 60000));
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6));

Вот фрагмент кода для процессоров:

    final KStreamBuilder builder = new KStreamBuilder();
    builder.addSource(key, Serdes.String().deserializer(), executor.getDeserializer(), key);
    builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key);
    builder.addSink(key + "_sink", key + "_sink", key + "_processor");
    final KafkaStreams streams = new KafkaStreams(builder, StreamConfigurations.getStreamsConfgurations(key, kafkaHost));
    streams.start();
    streams.setUncaughtExceptionHandler((t, th) -> {
    _logger.error("UncaughtException in Kafka StreamThread  " + t.getName() + " exception = ", th.getMessage());
    });
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Я просмотрел некоторые блоги, посвященные повторной обработке kafka, и думаю попробовать еще несколько конфигураций, перечисленных ниже:

    streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
    streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); //default is 10000
    streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); //default is 30000
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    streamsConfiguration.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000); //default is 5000
    streamsConfiguration.put(ProducerConfig.ACKS_CONFIG,1);
    streamsConfiguration.put(ProducerConfig.RETRIES_CONFIG,10);

Спасибо, Альпа


person Alpa Khatri    schedule 03.05.2017    source источник
comment
Я предлагаю задать этот вопрос в списке рассылки пользователей kafka (kafka.apache.org/contact), где вам следует привлечь больше внимания к этому конкретному вопросу.   -  person Michael G. Noll    schedule 03.05.2017
comment
Некоторые вопросы: Что такое executor? Что делает KafkaProcessor, может, поделитесь своим кодом? Кроме того, несколько не связанный с IIRC, вы должны установить UncaughtExceptionHandler перед вызовом streams.start().   -  person Michael G. Noll    schedule 03.05.2017
comment
Executor - это интерфейс, который мы используем для реализации процессоров в топологии. Kafka Processor - это класс, который расширяет org.apache.kafka.streams.processor.AbstractProcessor ‹String, ExecutorWork ‹T›› и переопределяет метод процесса, который отвечает за прием сообщения и отправку ответа на следующую тему в топологии.   -  person Alpa Khatri    schedule 03.05.2017


Ответы (1)


Есть ли у вас журналы запросов на стороне брокера примерно в то время, когда произошла перебалансировка из-за масштабирования? Я подозреваю, что были некоторые ошибки в запросе / ответах смещения-выборки и последующей выборке (например, если темы выборки были усечены, и, следовательно, выборка, начинающаяся с зафиксированного смещения, возвращает исключение «вне диапазона», вызывающее его сбросить). Но все эти подозреваемые должны быть проверены на основе журналов запросов на стороне сервера.

person Guozhang Wang    schedule 08.05.2017