Короче говоря, я хотел бы повторно запустить конвейер Flink для данных в Kafka с самого начала.
Флинк 0.10.2, Кафка 0.8.2.
У меня есть тема твитов в Kafka с удержанием 2 часа и конвейер в Flink, который считает твиты со скользящим окном 5 минут каждые 10 секунд.
Если я прерву конвейер и снова запущу его, я бы хотел, чтобы он перечитал старые твиты, тем самым выдав количество твитов за 5 минут. Вместо этого он, кажется, перезапускается с вновь поступивших твитов, поэтому требуется 5 минут, чтобы счетчик перешел в режим.
Я пробовал и auto.offset.reset = smallest/earliest
, и group.id
, но безуспешно. Я также попытался вручную изменить смещения в Kafka, как описано здесь: https://metabroadcast.com/blog/resetting-kafka-offsets
Затем я предполагаю, что проблема может быть связана с контрольной точкой Flink, но я понятия не имею / не могу найти информацию о том, как ее сбросить.
Кто-нибудь может поделиться рабочим кодом? Спасибо, Э.