Подсчет различных значений в потоковом конвейере

У меня есть конвейер, который выглядит как

pipeline.apply(PubsubIO.read.subscription("some subscription"))
            .apply(Window.into(SlidingWindow.of(10 mins).every(20 seconds)
                            .triggering(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(20 seconds))
                    .withAllowedLateness(Duration.ZERO)
                    .accumulatingFiredPanes()))
            .apply(RemoveDuplicates.create())
            .apply(Window.discardingFiredPanes()) // this is suggested in the warnings under https://cloud.google.com/dataflow/model/triggers#window-accumulation-modes
            .apply(Count.<String>globally().withoutDefaults())

Этот конвейер значительно пересчитывает отдельные значения (в 20 раз больше нормального значения). Первоначально я подозревал, что эта проблема может быть вызвана триггером по умолчанию. Я настроил использование триггеров, которые не допускают задержки/отбрасывания запущенных панелей/использования времени обработки, все из которых имеют схожие проблемы с пересчетом.

Я также пробовал ApproximateUnique.globally: он потерпел неудачу во время построения конвейера из-за исключения, похожего на Default values are not supported in Combine.globally() if the output PCollection is not windowed by GlobalWindows. Кажется, нет возможности добавить к нему withoutDefaults (как мы сделали с Count.globally).

Есть ли рекомендуемый способ сделать COUNT(DISTINCT) в конвейере потока данных/потока луча с разумной точностью?

P.S. Я использую Java Dataflow SDK 1.9.0.


person Jiayuan Ma    schedule 14.02.2017    source источник


Ответы (1)


Ваш код выглядит нормально; это не должно преувеличивать. Обратите внимание, что вы помещаете каждый элемент в 30 окон, поэтому, если у вас есть приемник, не поддерживающий окна (эквивалентно сворачиванию всех скользящих окон), вы ожидаете ровно в 30 раз больше элементов. Если бы вы могли показать немного больше конвейера или то, как вы наблюдаете за счетчиками, это может помочь.

Кроме того, у меня есть несколько предложений по конвейеру:

  • Я предлагаю изменить ваш триггер для RemoveDuplicates на AfterPane.elementCountAtLeast(1); это даст вам тот же результат с меньшей задержкой, поскольку более поздние прибывающие элементы не будут иметь никакого влияния. Этот триггер и ваш текущий триггер никогда не будут срабатывать повторно. Так что на самом деле не имеет значения, устанавливаете ли вы accumulatingFiredPanes() или discardingFiredPanes(). Это хорошо, потому что ни один из них не будет работать с остальной частью вашего конвейера.
  • I'd install a new trigger prior to the Count. The reason is a bit technical, but I'll try to describe it:
    • In your current pipeline, the trigger installed there (the "continuation trigger" of the trigger for RemoveDuplicates) notes the arrival time of the first element and waits until it has received all elements that were produced at or before that processing time, as measured by the upstream worker. There is some nondeterminism because it puns the local processing time and the processing time of other workers.
    • Если вы последуете моему совету и переключите триггер на RemoveDuplicates, тогда триггером продолжения будет AfterPane.elementCountAtLeast(1), поэтому он всегда будет выдавать счетчик как можно скорее, а затем отбрасывать дальнейшие данные, что очень неправильно.
person Kenn Knowles    schedule 14.02.2017
comment
Спасибо за Ваш ответ. Я использую графит для наблюдения за счетами и все еще изучаю, что может пойти не так. Для вашего первого пункта: будет ли AfterPane.elementCountAtLeast(1) уменьшаться? Учитывая, что этот триггер будет выдавать окна всякий раз, когда есть хотя бы один элемент, поэтому RemoveDuplicate большую часть времени будет иметь только один элемент? - person Jiayuan Ma; 14.02.2017
comment
Если бы вы использовали Repeatedly.forever(AfterPane.elementCountAtLeast(1)), вы бы получили много выходных данных, в основном только одного элемента. Но только с AfterPane.elementCountAtLeast(1)) он закроет окно после первого вывода, отбрасывая остальную часть ввода для этой клавиши. - person Kenn Knowles; 16.02.2017