Временная метка контекста в DataStream, созданная операцией Window

Допустим, у меня есть inputStream, и я выполняю над ним некоторую оконную операцию. Какая метка времени для события, созданного при выполнении над ним некоторой оконной операции.

....
DataStream<Integer> inputStream = // ...
DataStream<Integer> countStream = inputStream.keyBy(0)
    .timeWindow(time.Seconds(1))
    .sum();
DataStream<Integer> maxStream = inputStream.keyBy(0)
    .timeWindow(time.Seconds(1))
    .max();

Теперь я хочу объединить потоки countStream и maxStream, чтобы найти все временные метки, при которых countStream за последнюю секунду был равен maxStream.

ПРИМЕЧАНИЕ. Это не совсем та проблема, которую я пытаюсь решить, но это типичный пример. Решение этой проблемы поможет мне решить настоящую проблему, которую мне нужно решить.


person Prannay Khosla    schedule 22.03.2019    source источник


Ответы (2)


В случае, если временные окна являются временными окнами событий, события, которые они генерируют, будут отмечены как произошедшие в конце окна. В случае временных окон обработки у событий не будет временных меток, и в качестве источника информации о времени будут использоваться часы времени ЦП.

Обновлять:

Временные окна в Flink выровнены по эпохе - они не относятся к первому событию или чему-то в этом роде. Вам гарантируется, что два временных окна событий с одинаковой продолжительностью и смещением, например, два вращающихся окна длиной в одну секунду, будут собирать события за один и тот же интервал времени.

Поток событий, излучаемый временным окном события, сам по себе является потоком с временными метками событий и может быть дополнительно обработан окнами, как и любой другой поток событий с временными метками. Просто имейте в виду, что все события, созданные одним экземпляром окна (то есть за одну и ту же секунду), будут иметь одинаковую метку времени. Итак, если вы следуете за окном в 1 секунду с более коротким окном, например, 100 мс, то в 9 раз из 10 более короткое окно не увидит никаких событий.

person David Anderson    schedule 22.03.2019
comment
Если я использую время события, тогда, когда у меня есть два входных потока и я timeWindow, скажем, на 1 секунду (как указано выше), есть ли у меня какие-либо гарантии, что интервалы, используемые для обоих потоков, идентичны? т.е. для countStream, если есть интервал окна [start_time, end_time], то необходимо ли иметь такой же интервал для maxStream. Если это не так, то как я могу объединить два потока? - person Prannay Khosla; 25.03.2019

  • Для переворачивающихся окон, зависящих от времени, интервал составляет 1 секунду. Давайте посмотрим, есть ли диапазон окон в [start_ts, end_ts), и он будет выдавать результаты с отметкой времени end_ts - 1 (то есть start_ts - 999 < / em> причина end_ts здесь равна start_ts + 1000).
  • Для senoraios, зависящих от времени процесса, информация о временной метке отсутствует. сгенерировано.

Обновлять:

Если вы хотите, чтобы элементы из одного окна восходящего потока попадали в одно и то же окно нисходящего потока. Вы можете использовать Последовательная оконная операция.

Шаги:

  1. объедините ваш countStream с maxStream на union.
  2. окно совмещенных потоков с windowAll.
  3. выполните свою бизнес-логику в какой-нибудь process функции, следующей за windowAll.
person LeoZhang    schedule 25.03.2019