Использование одного и того же ключа сообщает о разном количестве записей для groupBy () и groupByKey () в приложении Kafka Streaming.

Когда мы используем настраиваемые значения groupByKey (), при обработке потока мы сталкиваемся с «потерянными пакетами». У нас есть единственный процессорный узел с исходной темой, из которой мы читаем пакеты, выполняем группировку и агрегацию в этой группе и выводим данные на основе вычислений, требующих доступа к хранилищу состояний.

Позвольте мне более подробно рассказать о проблеме и о том, как мы пытались ее понять до сих пор, ниже:

Обзор Мы настраиваем приложение Kafka Streams, в котором мы должны выполнять оконные операции. Мы группируем устройства по определенному ключу. Ниже приведены примеры столбцов, которые мы используем для GroupBy:

+---------+---------+------+
| Field Name | Field Value |
+---------+---------+------+
|       A    |    12       | 
|       B    |    abc      |  
|       C    |    x13      |
+---------+---------+------+

Пример ключа на основе приведенных выше данных: 12abcx13, где ключ = Поле (A) + Поле (B) + Поле (C)

Проблема Получение разного количества записей в двух сценариях для одного и того же ключа При указании ключа самостоятельно с помощью groupBy () Использование groupByKey () для группировки данных по ключу разделения «Входная тема Kafka».

Описание Мы впервые использовали функцию groupBy () потоков Kafka для группировки устройств с помощью указанного выше ключа. В этом случае приложение потоков удалило несколько записей и произвело меньше записей, чем ожидалось. Однако, когда мы не указали нашу собственную настраиваемую группировку с помощью функции groupBy (), а вместо этого использовали groupByKey () для ввода данных в исходный входящий ключ раздела Kafka, мы получили точное количество ожидаемых записей.

Чтобы убедиться, что мы использовали те же самые ключи, что и тема ввода для нашей пользовательской функции groupBy (), мы сравнили оба ключа в коде. Ключ темы ввода и настраиваемый ключ были точно такими же.

Итак, теперь мы пришли к выводу, что существует некоторая внутренняя функциональность функции groupBy, которую мы не можем понять, из-за чего функция groupBy и функция groupByKey сообщают о разных счетчиках для одного и того же ключа. Мы просмотрели несколько форумов, но не смогли понять причину этого явления.

Фрагмент кода:

С groupBykey ()

KStream<String, Output> myStream = this.stream
.groupByKey()       
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)      
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name(), this.store.name());

С groupBy ():

KStream<String, Output> myStream = this.stream
.groupBy((key, value) -> value.A + value.B + value.C,
         Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name()), this.store.name());

Настройка кластера Kafka

----------------------------
| No. of Nodes |    3      |
----------------------------
|  CPU Cores   |    2      | 
----------------------------
|     RAM      |    8 GB   |  
----------------------------

Приложение для потоковой передачи

-----------------------------------------
| Kafka Streams Version |    2.3.0      |
-----------------------------------------
|  Java Version         |     11        | 
-----------------------------------------

person Saad Rasool    schedule 10.09.2020    source источник


Ответы (1)


Если key == value.A + value.B + value.C обе программы должны делать то же самое. Не уверен, почему вы хотите использовать groupBy(): он перезаписывает key с тем же значением?

Разница между обеими программами в том, что groupBy() приведет к (ненужному) разделу на разделы по сравнению с groupByKey(). Как правило, повторное разбиение на разделы может привести к нарушению порядка данных, что может повлиять на последующую обработку. Однако в вашем случае, когда вы не меняете ключ, нарушение порядка не должно вводиться, поскольку раздел записи не должен изменяться. Единственный случай, о котором я могу думать, - это то, что ваше восходящее приложение использует другую стратегию разделения по сравнению с Kafka Streams (обратите внимание, что не все инструменты используют то же хеш-разделение, что и клиенты Java).

person Matthias J. Sax    schedule 17.01.2021