В настоящее время я использую Flink для исследования движков потоковой обработки. В процессе обучения я работаю с историческими потоками, которые состоят из кортежей следующего вида:
event_time, attribute_1, ..., attribute_X
где event_time
используется как TimeCharacteristic.EventTime
во время обработки. Кроме того, я вставляю свои наборы данных в топологию обработки, либо: (i) создавая структуры в памяти, либо (ii) читая сами файлы CSV.
К сожалению, я заметил, что даже если в оконном операторе поступило достаточное количество кортежей, которые завершают полное окно, это окно не отправляется вниз по потоку для обработки. В результате производительность значительно падает, и много раз у меня возникает OutOfMemoryError
исключение (с большими историческими потоками).
Чтобы проиллюстрировать типичный вариант использования, я представляю следующий пример:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
List<Tuple2<Long, Integer>> l = new ArrayList<>();
l.add(new Tuple2<>(1L, 11));
l.add(new Tuple2<>(2L, 22));
l.add(new Tuple2<>(3L, 33));
l.add(new Tuple2<>(4L, 44));
l.add(new Tuple2<>(5L, 55));
DataStream<Tuple2<Long, Integer>> stream = env.fromCollection(l);
stream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple2<Long, Integer> t) {
return t.f0;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(2),
Time.milliseconds(1)))
.sum(1)
.print();
env.execute();
Согласно содержанию l
, мне нужны следующие оконные результаты:
- [0, 2) Сумма: 11
- [1, 3) Сумма: 33
- [2, 4) Сумма: 55
- [3, 5) Сумма: 77
- [4, 6) Сумма: 99
- [5, 7) Сумма: 55
Каждый элемент списка можно прочитать как [отметка времени начала, отметка времени окончания), Sum: X.
Я ожидаю, что Flink будет выдавать оконный результат каждый раз, когда появляется кортеж с отметкой времени, превышающей отметку времени окончания открытого окна. Например, я ожидаю, что суммирование для окна [1, 3) будет произведено, когда кортеж с меткой времени 4L
будет передан в оператор окна. Однако обработка начинается, когда все кортежи из l
помещаются в топологию потока. То же самое происходит, когда я работаю с более крупными историческими потоками, что приводит к снижению производительности (или даже к истощению памяти).
Вопрос: Как я могу заставить Flink направлять окна вниз по потоку для обработки к моменту завершения окна?
Я считаю, что для SlidingEventTimeWindows
выселение окна запускается водяными знаками. Если предыдущее верно, как я могу написать свои топологии, чтобы они запускали окна к моменту поступления кортежа с более поздней меткой времени?
Спасибо