Триггер окна Cloud Dataflow перезаписывает значение из закрытого окна

Я пишу поток данных (Beam SDK 2.0.0), который читает из Pub/Sub, подсчитывает элементы в окне, а затем сохраняет подсчеты в BigTable в виде временных рядов. Окна фиксируются на длительности в 1 минуту.

Мое намерение состояло в том, чтобы обновлять значение текущего окна каждую секунду с помощью триггера, чтобы получать обновления в реальном времени в текущем временном окне.

Но это, кажется, не работает. Значение корректно обновляется каждую секунду, но как только поток данных начинает работать в следующую минуту, первое значение обновляется до нуля. Так что в основном только мое последнее значение верно, все остальное равно нулю.

Pipeline pipeline = Pipeline.create(options);

PCollection<String> live = pipeline
        .apply("Read from PubSub", PubsubIO.readStrings()
        .fromSubscription("projects/..."))
        .apply("Window per minute",
            Window
                .<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(Repeatedly
                    .forever(AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardSeconds(1)))                                         
                    .orFinally(AfterWatermark.pastEndOfWindow()))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.ZERO)
            );

Я пробовал играть с кодом триггера, но ничего не помогает. Мой единственный вариант сейчас — удалить весь блок .trigger. Кто-нибудь сталкивался с подобным поведением?


person Wouter    schedule 15.06.2017    source источник


Ответы (1)


После сообщения о моей проблеме в Google они обнаружили некоторые проблемы в Beam SDK, которые вызывают это. Подробнее по этим ссылкам:

Когда таймеры EOW и GC срабатывают вместе (ненулевое допустимое запаздывание), мы не замечаем, что это последняя панель: https://issues.apache.org/jira/browse/BEAM-2505

Таймеры времени обработки не игнорируются должным образом, если они приходят с таймером GC: https://issues.apache.org/jira/browse/BEAM-2502

Таймеры времени обработки просто интерпретируются как таймеры GC, совершенно некорректно сравнивая метки времени из разных временных доменов: https://issues.apache.org/jira/browse/BEAM-2504

person Wouter    schedule 23.06.2017