FlinkKafkaConsumer082 настройка auto.offset.reset не работает?

У меня есть потоковая программа Flink, которая считывает данные из топика Kafka. В программе auto.offset.reset установлен на «наименьший». При тестировании в IDE / Intellij-IDEA программа всегда могла читать данные с начала темы. Затем я настроил кластер flink / kafka и произвел некоторые данные в тему kafka. В первый раз, когда я запустил задание потоковой передачи, оно могло читать данные с самого начала темы. Но после этого я остановил задание потоковой передачи и запустил его снова, оно не будет читать данные с начала темы. Как сделать так, чтобы программа всегда считывала данные с начала темы?

    Properties properties = new Properties();
    properties.put("bootstrap.servers", kafkaServers);
    properties.put("zookeeper.connect", zkConStr);
    properties.put("group.id", group);
    properties.put("topic", topics);
    properties.put("auto.offset.reset", offset);

    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer082<String>(topics, new SimpleStringSchema(), properties));

person Jun    schedule 15.12.2015    source источник
comment
Параметр auto.offset.reset используется FlinkKafkaConsumer081, а не FlinkKafkaConsumer082. Поэтому мне интересно, как вы можете его установить? Кроме того, возможные значения - самые последние или самые ранние (а не самые маленькие. Можете ли вы поделиться своим кодом?   -  person Matthias J. Sax    schedule 15.12.2015
comment
Добавлен код, есть ли соответствующее свойство смещения в FlinkKafkaConsumer082?   -  person Jun    schedule 15.12.2015
comment
Что такое offset? offset = "smalles"? Может быть, ваша тема больше не содержит исходных данных из-за того, что вы превысили срок хранения? Не могли бы вы проверить с kafka-console-consumer.sh, чтобы прочитать тему с самого начала?   -  person Till Rohrmann    schedule 15.12.2015
comment
смещение - наименьшая строка. Проверил с помощью kafka-console-consumer.sh, исходные данные все еще в kafka.   -  person Jun    schedule 15.12.2015
comment
Возможные значения для auto.offset.reset - самые поздние или самые ранние (наименьшее - недействительно).   -  person Matthias J. Sax    schedule 15.12.2015
comment
Он не работает с auto.offset.reset как раньше.   -  person Jun    schedule 15.12.2015


Ответы (1)


Если вы хотите всегда читать с самого начала, вам необходимо отключить контрольные точки в контексте вашего потока.

Также отключите его на уровне потребительских свойств:

enable.auto.commit = false или auto.commit.enable = false (зависит от версии kafka)

Другой способ: вы можете сохранить ckeckpointing для аварийного переключения, но сгенерировать новый group.id, когда вам нужно прочитать с самого начала (просто очистите иногда zookeeper)

person Anatoly Deyneka    schedule 15.12.2015
comment
Привет, я не мог работать на первом месте. Но второй способ, который вы упомянули, всегда работает. - person Jun; 15.12.2015
comment
это означает, что смещения сохраняются в zookeeper (вы можете проверить это в zookeeper или с помощью Kafka Consumer Offset Checker). Ваша цель - отключить это. Потребитель Kafka Flink глубоко интегрируется с механизмом контрольных точек Flink, чтобы гарантировать, что записи, прочитанные из Kafka, обновляют состояние Flink ровно один раз. Отключения flink-checkpointning должно быть достаточно. Предоставьте полный код, как вы настраиваете свой env - person Anatoly Deyneka; 15.12.2015
comment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Существует всего несколько API-интерфейсов enableCheckpointing, но я не вызываю их в своей программе. - person Jun; 15.12.2015
comment
issues.apache.org/jira/browse/FLINK-2974 - добавили смещение коммитера, если контрольная точка отключена. Похоже, вы не можете отключить автоматическую фиксацию, см. Изменения для этой проблемы - github. com / apache / flink / pull / 1341 / files - person Anatoly Deyneka; 15.12.2015