Сессия Windows во Flink дает неожиданные результаты

У меня есть поток записей, которые «привязаны» к двум полям, а затем назначается окно сеанса с перерывом в 30 секунд. Я использую «отметку времени», прикрепленную к записям, как время события. Я использую водяной знак assignAscendingTimestamps.

Рассмотрим, например, следующие записи. В потоке используется ключ (пользователь, место).

Запись1: пользователь1, место1, отметка времени t1

Запись2: пользователь2, место1, отметка времени через 30 секунд после t1

Record3: user1, place1, timestamp в пределах 30 секунд после t1

Record4: user1, place1, отметка времени через 30 секунд после t1

Record2 принадлежит пользователю user2 и, следовательно, принадлежит другому сегменту, так как поток имеет ключ. Следовательно, я ожидал, что Record1, Record3 и Record4 принадлежат одному ведру, а Record2 - другому ведру.

Bucket1

Запись1: пользователь1, место1, отметка времени t1

Record3: user1, place1, timestamp в пределах 30 секунд после t1

Record4 - user1, place1, отметка времени через 30 секунд после t1

Bucket2

Запись2: пользователь2, место1, отметка времени через 30 секунд после t1

Насколько я понимаю, окно сеанса, содержащее Record1 и Record3, будет запускаться только по прибытии Record4. Но когда я запускаю код, сеанс, содержащий только Record1, запускается, когда приходит Record2, поскольку временная метка Record2 находится после временного интервала (30 секунд) временной метки Record1, хотя ключ Record2 отличается. Я просмотрел документацию Flink и несколько примеров, которые я смог найти в Интернете для Session Windows. Но я не могу решить эту проблему. Что-то мне здесь не хватает? Может быть, это из-за водяного знака возрастающих отметок времени?


person avidlearner    schedule 18.05.2017    source источник


Ответы (1)


Проблема в том, что assignAscendingTimestamps требует, чтобы ваши временные метки монотонно увеличивались по всем ключам. Причина этого в том, что Flink не может создавать водяные знаки по ключевым словам.

Обновлять

Поскольку Flink не может генерировать водяные знаки для каждого ключа, необходимо создавать водяные знаки, чтобы они действовали для всех элементов. Если временные метки монотонны для каждого ключа, но не для всех ключей, то вам необходимо определить максимальную неупорядоченность (разницу во временных метках) между двумя ключами. Вычитая это неупорядоченно из метки времени элемента, вы получите действительный водяной знак. См. Также BoundedOutOfOrdernessTimestampExtractor. Однако имейте в виду, что если элементы прибывают с большим нарушением порядка, это также сломается.

person Till Rohrmann    schedule 18.05.2017
comment
Спасибо за ответ Тилль. Мне нужно обработать записи, поступающие с пользовательских устройств. Метка времени, прикрепляемая к записям, зависит от часов устройства. Моя обработка должна быть отдельной для каждого пользователя, и я могу использовать только отметку времени устройства в качестве времени события, потому что отметки времени для разных пользователей не будут синхронизироваться. Если Flink не может сгенерировать водяные знаки по ключевым словам, моя обработка при таком подходе пойдет не так. Какой подход вы бы посоветовали в таких случаях? - person avidlearner; 19.05.2017