Я создал потребителя Kafka в Apache Flink API, написанном на Scala. Всякий раз, когда я передаю какие-то сообщения из темы, она должным образом их получает. Однако, когда я перезапускаю потребителя, вместо получения новых или неиспользованных сообщений он потребляет последнее сообщение, отправленное в эту тему.
Вот что я делаю:
Запуск продюсера:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
Запуск потребителя:
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test") val env = StreamExecutionEnvironment.getExecutionEnvironment val st = env .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties)) env.enableCheckpointing(5000) st.print() env.execute()
Передача некоторых сообщений
- Остановка потребителя
- При повторном запуске потребителя печатается последнее отправленное мной сообщение. Я хочу, чтобы он печатал только новые сообщения.