У меня есть конвейер, который выглядит как
pipeline.apply(PubsubIO.read.subscription("some subscription"))
.apply(Window.into(SlidingWindow.of(10 mins).every(20 seconds)
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(20 seconds))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes()))
.apply(RemoveDuplicates.create())
.apply(Window.discardingFiredPanes()) // this is suggested in the warnings under https://cloud.google.com/dataflow/model/triggers#window-accumulation-modes
.apply(Count.<String>globally().withoutDefaults())
Этот конвейер значительно пересчитывает отдельные значения (в 20 раз больше нормального значения). Первоначально я подозревал, что эта проблема может быть вызвана триггером по умолчанию. Я настроил использование триггеров, которые не допускают задержки/отбрасывания запущенных панелей/использования времени обработки, все из которых имеют схожие проблемы с пересчетом.
Я также пробовал ApproximateUnique.globally
: он потерпел неудачу во время построения конвейера из-за исключения, похожего на Default values are not supported in Combine.globally() if the output PCollection is not windowed by GlobalWindows.
Кажется, нет возможности добавить к нему withoutDefaults
(как мы сделали с Count.globally
).
Есть ли рекомендуемый способ сделать COUNT(DISTINCT)
в конвейере потока данных/потока луча с разумной точностью?
P.S. Я использую Java Dataflow SDK 1.9.0.