Пакетный потребитель Kafka без повторяющихся записей

У меня следующие требования: мы читаем из реляционной базы данных с CDC вставкой / обновлением в определенную таблицу и импортируем их как события в тему Kafka.

например jdbc-источник-тема

|---------------------|------------------|------------------|
|      Timestamp      |        ID        |      Column      |
|---------------------|------------------|------------------|
|        10:00        |         1        |         A        |
|---------------------|------------------|------------------|
|        10:01        |         2        |         B        |
|---------------------|------------------|------------------|
|        10:01        |         1        |         C        |
|---------------------|------------------|------------------|

В конце конвейера мы хотели бы использовать эти события один раз в день и избежать дублирования для одного и того же идентификатора.

например целевая тема

|---------------------|------------------|------------------|
|      Timestamp      |        ID        |      Column      |
|---------------------|------------------|------------------|
|        10:01        |         2        |         B        |
|---------------------|------------------|------------------|
|        10:01        |         1        |         C        |
|---------------------|------------------|------------------|

На мой взгляд, лучшее решение - иметь потребителя с group_id (чтобы смещение сохранялось в kafka на следующий день), который запускался бы один раз. Но это означает, что каждый раз, когда потребитель запускается, он должен отбрасывать дубликаты из выбранных записей.

Учитывая, что эта таблица может использоваться также для KSQL Joins в будущем, мне интересно, существует ли лучший подход с использованием запросов KSQL, чтобы потребитель мог извлекать из очищенной темы с одной записью для каждого ключа.


person Mattia Fantoni    schedule 02.06.2020    source источник


Ответы (1)


Если единственным потребителем этих данных является ksqlDB, вам может не потребоваться де-дублирование, поскольку ksqlDB будет правильно обрабатывать несколько обновлений одного и того же ключа, если вы импортируете тему в виде ТАБЛИЦЫ в ksql, то есть вместо того, чтобы делать:

CREATE STREAM FOO (... columns ...) WITH (...);

Do:

CREATE TABLE FOO (... columns ...) WITH (...);

В настоящее время, когда ksqlDB обрабатывает такой журнал изменений, он выводит все / некоторые дубликаты, в зависимости от того, как вы настроили cache.max.bytes.buffering.

Вы можете избежать генерации дубликатов, используя 24-часовое окно и добавив Отключить поддержку. А пока, если вы хотите удалить дубликаты, как вы предложили. Вы также можете заставить что-то работать, написав собственное приложение Kafka Streams, чтобы материализовать таблицу в хранилище состояний и использовать api подавления для удаления дубликатов.

Однако стоит отметить, что семантически дубликаты не вызывают никаких проблем. Результат материализации журнала изменений в таблице правильный с дубликатами и без них. Итак, как я сказал в начале, удаление дубликатов может даже не потребоваться.

person Andrew Coates    schedule 02.06.2020
comment
Но насколько я знаю, чтобы создать таблицу из потока, мне нужна группа по выражению, учитывая, что таблица имеет те же столбцы потока, единственным решением было бы группировать по каждому столбцу с окончательным счетом, но это не так. t отбрасывать дубликаты на ключевом уровне только на уровне строки. Я что-то упускаю? - person Mattia Fantoni; 05.06.2020
comment
Я предлагаю вам не начинать с импорта вашей темы как потока, а импортировать ее как таблицу, то есть использовать CREATE TABLE, а не CREATE STREAM для импорта ваших данных. - person Andrew Coates; 12.06.2020