Когда мы используем настраиваемые значения groupByKey (), при обработке потока мы сталкиваемся с «потерянными пакетами». У нас есть единственный процессорный узел с исходной темой, из которой мы читаем пакеты, выполняем группировку и агрегацию в этой группе и выводим данные на основе вычислений, требующих доступа к хранилищу состояний.
Позвольте мне более подробно рассказать о проблеме и о том, как мы пытались ее понять до сих пор, ниже:
Обзор Мы настраиваем приложение Kafka Streams, в котором мы должны выполнять оконные операции. Мы группируем устройства по определенному ключу. Ниже приведены примеры столбцов, которые мы используем для GroupBy:
+---------+---------+------+
| Field Name | Field Value |
+---------+---------+------+
| A | 12 |
| B | abc |
| C | x13 |
+---------+---------+------+
Пример ключа на основе приведенных выше данных: 12abcx13, где ключ = Поле (A) + Поле (B) + Поле (C)
Проблема Получение разного количества записей в двух сценариях для одного и того же ключа При указании ключа самостоятельно с помощью groupBy () Использование groupByKey () для группировки данных по ключу разделения «Входная тема Kafka».
Описание Мы впервые использовали функцию groupBy () потоков Kafka для группировки устройств с помощью указанного выше ключа. В этом случае приложение потоков удалило несколько записей и произвело меньше записей, чем ожидалось. Однако, когда мы не указали нашу собственную настраиваемую группировку с помощью функции groupBy (), а вместо этого использовали groupByKey () для ввода данных в исходный входящий ключ раздела Kafka, мы получили точное количество ожидаемых записей.
Чтобы убедиться, что мы использовали те же самые ключи, что и тема ввода для нашей пользовательской функции groupBy (), мы сравнили оба ключа в коде. Ключ темы ввода и настраиваемый ключ были точно такими же.
Итак, теперь мы пришли к выводу, что существует некоторая внутренняя функциональность функции groupBy, которую мы не можем понять, из-за чего функция groupBy и функция groupByKey сообщают о разных счетчиках для одного и того же ключа. Мы просмотрели несколько форумов, но не смогли понять причину этого явления.
Фрагмент кода:
С groupBykey ()
KStream<String, Output> myStream = this.stream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name(), this.store.name());
С groupBy ():
KStream<String, Output> myStream = this.stream
.groupBy((key, value) -> value.A + value.B + value.C,
Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name()), this.store.name());
Настройка кластера Kafka
----------------------------
| No. of Nodes | 3 |
----------------------------
| CPU Cores | 2 |
----------------------------
| RAM | 8 GB |
----------------------------
Приложение для потоковой передачи
-----------------------------------------
| Kafka Streams Version | 2.3.0 |
-----------------------------------------
| Java Version | 11 |
-----------------------------------------