Кафка снова потребляет последнее сообщение, когда я перезапускаю клиент Flink

Я создал потребителя Kafka в Apache Flink API, написанном на Scala. Всякий раз, когда я передаю какие-то сообщения из темы, она должным образом их получает. Однако, когда я перезапускаю потребителя, вместо получения новых или неиспользованных сообщений он потребляет последнее сообщение, отправленное в эту тему.

Вот что я делаю:

  1. Запуск продюсера:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
    
  2. Запуск потребителя:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    
  3. Передача некоторых сообщений

  4. Остановка потребителя
  5. При повторном запуске потребителя печатается последнее отправленное мной сообщение. Я хочу, чтобы он печатал только новые сообщения.

person Piyush Shrivastava    schedule 23.03.2016    source источник


Ответы (1)


Вы запускаете потребителя Kafka с интервалом контрольной точки 5 секунд. Таким образом, каждые 5 секунд Flink создает копию состояния вашего оператора (смещения) для восстановления.

Как только контрольная точка будет завершена, оператор узнает, что контрольная точка завершена. В этом уведомлении потребитель Kafka передает смещения Zookeeper. Таким образом, примерно каждые 5 секунд мы записываем смещения последней контрольной точки в ZK.

Когда вы снова запустите задание Flink, оно найдет смещения в ZK и продолжит работу оттуда. В зависимости от времени все сообщения, полученные после фиксации в ZK, будут отправлены снова.

Вы не можете избежать этого, потому что .print() «оператор» не является частью контрольной точки. Это означало как утилиту отладки. Однако приемник данных, который участвует в установлении контрольных точек (например, приемник скользящего файла), гарантирует, что в файловую систему не будут записаны дубликаты.

person Robert Metzger    schedule 23.03.2016
comment
не могли бы вы указать путь в zookeeper, где будет храниться информация о смещении? - person vishnu viswanath; 22.06.2016
comment
Посетите эту страницу: cwiki.apache.org/confluence / display / KAFKA / - person Robert Metzger; 23.06.2016
comment
спасибо @rmetzger, я выполнил задание flink и ожидал, что смещения будут сохранены в пути потребителей в zookeeper, но под потребителями я мог видеть только console-consumer-65334. Я не уверен, что происходит, не могли бы вы подсказать, почему это могло произойти? - person vishnu viswanath; 23.06.2016
comment
Вы используете Kafka 0.8 или 0.9? 0.9 не передает ZK по умолчанию. У вас включена контрольная точка? Если нет, мы фиксируем только смещение с интервалом в 60 секунд. Так что, если вы отменили задание раньше, вы не увидите его в ZK. - person Robert Metzger; 23.06.2016
comment
Я использую 0.9, у меня еще не включена контрольная точка, но я выполнял задание более 20 минут один раз. - person vishnu viswanath; 23.06.2016
comment
Хорошо, значит, Кафка вообще не будет писать в ZK. - person Robert Metzger; 24.06.2016
comment
Ok. Спасибо. Я включу контрольные точки и тест. Но я думал, вы сказали, что даже если контрольная точка не включена, смещение фиксируется каждые 60 секунд, и я не нашел никаких смещений даже после выполнения ›60 с. - person vishnu viswanath; 27.06.2016
comment
Думаю, ответ не решит проблему. У меня та же проблема (одно и то же сообщение отображается после запуска), хотя у меня включены контрольные точки. Я использую Kafka 0.10, а Flink знает только о моем брокере Kafka (новом клиенте), а не о ZK, если это важно. Я использую BucketingSink, такая же проблема с RollingSink. - person static-max; 13.09.2016