Flink + Kafka сбросить контрольную точку и смещение

Короче говоря, я хотел бы повторно запустить конвейер Flink для данных в Kafka с самого начала.

Флинк 0.10.2, Кафка 0.8.2.

У меня есть тема твитов в Kafka с удержанием 2 часа и конвейер в Flink, который считает твиты со скользящим окном 5 минут каждые 10 секунд.

Если я прерву конвейер и снова запущу его, я бы хотел, чтобы он перечитал старые твиты, тем самым выдав количество твитов за 5 минут. Вместо этого он, кажется, перезапускается с вновь поступивших твитов, поэтому требуется 5 минут, чтобы счетчик перешел в режим.

Я пробовал и auto.offset.reset = smallest/earliest, и group.id, но безуспешно. Я также попытался вручную изменить смещения в Kafka, как описано здесь: https://metabroadcast.com/blog/resetting-kafka-offsets

Затем я предполагаю, что проблема может быть связана с контрольной точкой Flink, но я понятия не имею / не могу найти информацию о том, как ее сбросить.

Кто-нибудь может поделиться рабочим кодом? Спасибо, Э.


person ecesena    schedule 20.02.2016    source источник
comment
Думаю, вам нужно использовать точки сохранения. Они будут доступны в следующем выпуске 1.0 (или вы можете проверить текущий мастер): ci.apache.org/projects/flink/flink-docs-master/apis/streaming/   -  person Matthias J. Sax    schedule 20.02.2016
comment
Это было бы еще лучше, но я бы также согласился перечитать все из Kafka с самого начала и пересчитать то, что является последним окном.   -  person ecesena    schedule 22.02.2016


Ответы (1)


Чтобы перечитать все, что доступно в теме Kafka, достаточно установить для нового «group.id» и «auto.offset.reset» значение «самый ранний».

Если это не сработает, что-то не так.

person Robert Metzger    schedule 21.02.2016
comment
Я так и думал, но Flink кажется более сложным, поскольку он управляет внутренним состоянием и отправляет его копию в zookeeper. - person ecesena; 22.02.2016
comment
Что делает Flink: он действительно сохраняет внутреннее состояние последних считанных смещений. Если задание Flink по какой-либо причине завершается неудачно, оно восстанавливается из этого состояния. Это позволяет пользователям выполнять последующие операции с семантикой ровно один раз. Flink использует механизм распределенных моментальных снимков для периодического резервного копирования состояния. Как только распределенный снимок подтвержден всеми операторами, источник Kafka также фиксирует смещения в ZK. Таким образом, пользователи могут перезапустить задание со смещения в ZK. Обратите внимание, что механизм от ZK не дает гарантии ровно один раз. - person Robert Metzger; 22.02.2016
comment
спасибо за подробности - знаете ли вы, есть ли способ переопределить состояние Flink? (или аналогично принудительному использованию смещения ZK?) - person ecesena; 23.02.2016
comment
Состояние сбрасывается, когда задание останавливается или отменяется. Невозможно изменить значения в состоянии или удалить его вручную. Но вы можете просто отменить и запустить задание, и оно получит смещение от ZK (при условии, что group.id такой же, а смещение все еще доступно в kafka) - person Robert Metzger; 24.02.2016