Flink streaming - декартово произведение и оконное отображение потоков

Предположим, у меня есть поток с ключами и отметками времени. Я хочу создать декартово произведение этих ключей в каждом окне (скользящее окно). Если у меня есть ключи 1,2,3,4 и я установил параллелизм на два, я хочу «сгруппировать» их следующим образом:

1 - 2    2 - 3
1 - 3    2 - 4
1 - 4    3 - 4

И я хочу обрабатывать элементы для каждой группы в каждом окне. Итак, представьте, что указанные выше элементы (1,2,3,4) находятся в одном окне в зависимости от их временных меток.

В простейшей форме мой вопрос: учитывая некоторые элементы в каждом скользящем окне (которое может содержать несколько ключей), я хочу создать комбинации этих ключей, как в приведенном выше примере, и применить собственный алгоритм к этим сгруппированным элементам.

Что я пробовал до сих пор, так это использование

.assignAscendingTimestamps(...)
.keyBy(...)
.timeWindow(Time.seconds(5),Time.seconds(5))
.apply(...)

но это будет применять только алгоритм для каждого ключа, плюс невозможно создать комбинации.

PS: Я видел этот документ: https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

и я думаю, что упомянутые здесь дискретизированные потоки сработают, но они недоступны во flink 1.2 (или любой другой версии).


person Al Jenssen    schedule 10.02.2017    source источник
comment
Возможно, я не понимаю, но учитывая некоторые элементы в каждом скользящем окне (которое может содержать несколько ключей) не может работать с данным примером, потому что вы создаете timeWindow на KeyedStream. Таким образом, все элементы в вашем timeWindow будут иметь один и тот же ключ.   -  person Patze    schedule 13.02.2017
comment
Вы правильно поняли. Я хочу, чтобы в каждом скользящем окне было несколько клавиш. Учитывая поток, я хочу создать все возможные пары ключей и обработать эти пары вместе в скользящем окне.   -  person Al Jenssen    schedule 13.02.2017


Ответы (1)


keyBy() в контексте потоковой передачи имеет эффект, сравнимый с groupBy() в контексте пакетной обработки. На основе KeySelector, который вы предоставляете, поток делится на несколько подпотоков. Затем они загружаются в ваш timeWindow. Таким образом, то, что вы сейчас делаете в своем примере, - это timeWindow на KeyedStream. Все элементы, которые попадают в ваш timeWindow и передаются в функцию, которую вы применяете впоследствии, будут иметь точно такой же ключ. Вот хорошее введение в работу с окнами в Flink, в котором более подробно рассказывается, как использовать окна https://flink.apache.org/news/2015/12/04/Introduction-windows.html. Если ваши ключи содержат семантическую информацию и вы хотите, чтобы в одном окне были разные ключи, вы можете создать искусственный ключ (например, простое целое число), который вы назначаете своим записям. На основе этого ключа у вас будет больше контроля над группировкой и, следовательно, секционированием потока. Для вашего окончательного вычисления, которое берет результаты предыдущих шагов и объединяет их, вам понадобится дополнительный шаг (например, join или reduce). Взгляните на первый пример здесь: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html Он подсчитывает все элементы с заданным ключом в течение timeWindow 5 секунд с использованием функции sum(). Результат будет содержать по одному элементу на клавишу для каждого окна.

person Patze    schedule 14.02.2017
comment
Если ваши ключи содержат семантическую информацию и вы хотите, чтобы в одном окне были разные ключи, вы можете создать искусственный ключ (например, простое целое число), который вы назначаете своим записям. Это очень логичный подход, но что произойдет, если вам понадобятся все возможные пары между ключами? Как это можно реализовать с помощью Flink? - person Al Jenssen; 14.02.2017
comment
Если вы посмотрите на результат, который вы получите от операции sum() в примере из документации Flink, вы получите `DataStream‹ Tuple2 ‹String, Integer ›› _ 2_sum ()` возвращает Datastream и, таким образом, вы можете выполнить любое преобразование в результат, разрешенный для Datastreams, например map() или flatMap. Документация по выпуску находится здесь ci.apache.org/ проекты / flink / flink-docs-release-1.2 / dev / дают вам хороший обзор того, какие операции могут выполняться с какими входными данными и что в итоге получается из данных. - person Patze; 15.02.2017