Я пишу поток данных (Beam SDK 2.0.0), который читает из Pub/Sub, подсчитывает элементы в окне, а затем сохраняет подсчеты в BigTable в виде временных рядов. Окна фиксируются на длительности в 1 минуту.
Мое намерение состояло в том, чтобы обновлять значение текущего окна каждую секунду с помощью триггера, чтобы получать обновления в реальном времени в текущем временном окне.
Но это, кажется, не работает. Значение корректно обновляется каждую секунду, но как только поток данных начинает работать в следующую минуту, первое значение обновляется до нуля. Так что в основном только мое последнее значение верно, все остальное равно нулю.
Pipeline pipeline = Pipeline.create(options);
PCollection<String> live = pipeline
.apply("Read from PubSub", PubsubIO.readStrings()
.fromSubscription("projects/..."))
.apply("Window per minute",
Window
.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.orFinally(AfterWatermark.pastEndOfWindow()))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
);
Я пробовал играть с кодом триггера, но ничего не помогает. Мой единственный вариант сейчас — удалить весь блок .trigger
. Кто-нибудь сталкивался с подобным поведением?