Принудительное исключение скользящих окон событий для обработки (исторические потоки) на Flink

В настоящее время я использую Flink для исследования движков потоковой обработки. В процессе обучения я работаю с историческими потоками, которые состоят из кортежей следующего вида:

event_time, attribute_1, ..., attribute_X

где event_time используется как TimeCharacteristic.EventTime во время обработки. Кроме того, я вставляю свои наборы данных в топологию обработки, либо: (i) создавая структуры в памяти, либо (ii) читая сами файлы CSV.

К сожалению, я заметил, что даже если в оконном операторе поступило достаточное количество кортежей, которые завершают полное окно, это окно не отправляется вниз по потоку для обработки. В результате производительность значительно падает, и много раз у меня возникает OutOfMemoryError исключение (с большими историческими потоками).

Чтобы проиллюстрировать типичный вариант использования, я представляю следующий пример:

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
    l.add(new Tuple2<>(1L, 11));
    l.add(new Tuple2<>(2L, 22));
    l.add(new Tuple2<>(3L, 33));
    l.add(new Tuple2<>(4L, 44));
    l.add(new Tuple2<>(5L, 55));
    DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
    stream.assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
            @Override
            public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
                return t.f0;
            }
        })
        .windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2), 
                Time.milliseconds(1)))
        .sum(1)
        .print();
    env.execute();

Согласно содержанию l, мне нужны следующие оконные результаты:

  • [0, 2) Сумма: 11
  • [1, 3) Сумма: 33
  • [2, 4) Сумма: 55
  • [3, 5) Сумма: 77
  • [4, 6) Сумма: 99
  • [5, 7) Сумма: 55

Каждый элемент списка можно прочитать как [отметка времени начала, отметка времени окончания), Sum: X.

Я ожидаю, что Flink будет выдавать оконный результат каждый раз, когда появляется кортеж с отметкой времени, превышающей отметку времени окончания открытого окна. Например, я ожидаю, что суммирование для окна [1, 3) будет произведено, когда кортеж с меткой времени 4L будет передан в оператор окна. Однако обработка начинается, когда все кортежи из l помещаются в топологию потока. То же самое происходит, когда я работаю с более крупными историческими потоками, что приводит к снижению производительности (или даже к истощению памяти).

Вопрос: Как я могу заставить Flink направлять окна вниз по потоку для обработки к моменту завершения окна?

Я считаю, что для SlidingEventTimeWindows выселение окна запускается водяными знаками. Если предыдущее верно, как я могу написать свои топологии, чтобы они запускали окна к моменту поступления кортежа с более поздней меткой времени?

Спасибо


person nick.katsip    schedule 06.02.2018    source источник
comment
Ваш источник на самом деле является потоком, или вы всегда сначала читаете его в память? В вашем примере полный ввод будет считан fromCollection   -  person Joshua DeWald    schedule 07.02.2018
comment
Обычно он читается из файла CSV. Пример, который я опубликовал, представляет собой простой пример того, что происходит внутри Flink.   -  person nick.katsip    schedule 07.02.2018


Ответы (1)


AscendingTimestampExtractor использует стратегию периодической маркировки водяных знаков, в которой Flink будет вызывать метод getCurrentWatermark() каждые n миллисекунд, где n - это autowatermarkinterval.

Интервал по умолчанию составляет 200 миллисекунд, что очень долго по сравнению с размером ваших окон. Однако их нельзя напрямую сравнивать - 200 мсек измеряются во времени обработки, а не во времени события. Тем не менее, я подозреваю, что если вы не изменили этот параметр конфигурации, то будет создано множество окон до того, как будет выпущен первый водяной знак, что, я думаю, объясняет то, что вы видите.

Вы можете уменьшить интервал автоматической установки водяных знаков (возможно, до 1 миллисекунды). Или вы можете реализовать AssignerWithPunctuatedWatermarks, что даст вам больше контроля.

person David Anderson    schedule 06.02.2018
comment
большое спасибо за ваш ответ, и я обязательно попробую. Однако в больших наборах данных (~ 2-4 ГБ) у меня аналогичное поведение. Несмотря на то, что получение этих кортежей занимает намного больше 200 мс, обработка откладывается до тех пор, пока не будут приняты все кортежи. Есть идеи, почему это может происходить? - person nick.katsip; 06.02.2018
comment
Каков источник данных при работе с большими наборами данных? Кафка, файловая система, ...? - person David Anderson; 06.02.2018
comment
Обычно файлы CSV хранятся в файловой системе. Я работаю в основном с историческими потоками. - person nick.katsip; 06.02.2018