Мы используем apache kafka-streams 0.10.2.0 в приложении. Мы используем топологию kafka-streams для передачи обработанных данных в следующую тему до конца обработки.
Кроме того, мы используем контейнер AWS ECS для развертывания потребительского приложения. Мы заметили, что потребитель выбирает для обработки очень старые сообщения, хотя они были обработаны раньше. Эта проблема возникает случайным образом во время увеличения или уменьшения масштаба службы или в случае новых развертываний. Я понимаю, что во время ребалансировки потребителей некоторые сообщения могут быть повторно обработаны. Но в данном случае это повторная обработка большого количества сообщений, которые были успешно обработаны долгое время (более 10 дней назад).
Мы не можем понять основную причину этой проблемы. Разве это не правильная фиксация смещений и сбор случайных сообщений в разной топологии. Это приводит к несогласованному поведению одного сообщения, повторно обрабатываемого в любой топологии.
Удивительно, но мы также не видим никаких исключений в отношении потребителя. Пожалуйста, помогите.
Вот конфигурации, которые мы используем:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"UniqueKey");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,key);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 60000));
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6));
Вот фрагмент кода для процессоров:
final KStreamBuilder builder = new KStreamBuilder();
builder.addSource(key, Serdes.String().deserializer(), executor.getDeserializer(), key);
builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key);
builder.addSink(key + "_sink", key + "_sink", key + "_processor");
final KafkaStreams streams = new KafkaStreams(builder, StreamConfigurations.getStreamsConfgurations(key, kafkaHost));
streams.start();
streams.setUncaughtExceptionHandler((t, th) -> {
_logger.error("UncaughtException in Kafka StreamThread " + t.getName() + " exception = ", th.getMessage());
});
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Я просмотрел некоторые блоги, посвященные повторной обработке kafka, и думаю попробовать еще несколько конфигураций, перечисленных ниже:
streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); //default is 10000
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); //default is 30000
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
streamsConfiguration.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000); //default is 5000
streamsConfiguration.put(ProducerConfig.ACKS_CONFIG,1);
streamsConfiguration.put(ProducerConfig.RETRIES_CONFIG,10);
Спасибо, Альпа
executor
? Что делаетKafkaProcessor
, может, поделитесь своим кодом? Кроме того, несколько не связанный с IIRC, вы должны установить UncaughtExceptionHandler перед вызовомstreams.start()
. - person Michael G. Noll   schedule 03.05.2017