Триггерные элементы ровно один раз с использованием фиксированного окна с Apache Beam

Я читаю данные из Google pub-sub и помещаю их в фиксированное окно продолжительностью 5 минут. Но - данные не запускаются правильно. Я пробовал несколько комбинаций, похоже, ничего не работает. Это выглядит довольно просто, но я не могу понять это правильно.

Пример использования -

  1. Чтение данных из pub-sub
  2. Окружите их за 5 минут
  3. Выполните агрегирование по истечении 5-минутного окна.
  4. Допускается срок опоздания 1 день.

Попытки:

1. Использование AfterWatermark.pastEndOfWindow для запуска. Это вообще не производит никакого вывода. По подписке было прочитано около 1000 сообщений, но окно не выводило ни одного сообщения.

Window.<EventModel>into(
                FixedWindows.of(Duration.standardMinutes(5)))
                .triggering(AfterWatermark.pastEndOfWindow())
                .withAllowedLateness(Duration.standardDays(1), Window.ClosingBehavior.FIRE_ALWAYS)
                .discardingFiredPanes();

2. Использование глобального окна: работает правильно. Но здесь используется GlobalWindows, но мне нужно реализовать фиксированное оконное управление.

Window<EventModel> window = Window.<OrderEvent>
                into(new GlobalWindows())
                .triggering(
                        Repeatedly.forever( 
              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))))
                .discardingFiredPanes()
                .withAllowedLateness(Duration.standardDays(1));

Я пробовал другие комбинации, которые используют - Ранний или поздний обжиг - которые запускают некоторые элементы, но не подходят для моего варианта использования - мне не нужны ранние или поздние обжигы - просто нужны результаты каждые 5 минут.

Любой ввод был бы действительно полезен, я безуспешно потратил на это слишком много времени.


person kylebutters    schedule 01.08.2020    source источник


Ответы (1)


Нашел проблему:

Это была ошибка DirectRunner. По какой-то причине - прямой бегун не продвигал водяной знак и, следовательно, ничего не запускалось.

Приведенный ниже код работал правильно - с Dataflow Runner - элементы запускались после окончания окна.

Window<MyModel> window = Window.<MyModel>into(FixedWindows.of(Duration.standardMinutes(10)))
                    .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                    .withAllowedLateness(Duration.standardDays(1))
                    .discardingFiredPanes();
person kylebutters    schedule 02.08.2020