Управление смещением Kafka и синхронизация с БД

Я работаю над приложением, использующим поток Kafka и базу данных.

В моем приложении я управляю смещением Kafka вручную и фиксирую смещение только в случае успешной обработки сообщения (т.е. после успешной обработки и обновления в БД).

Однако, если после обновления БД мое приложение отключается до фиксации, то, когда оно возвращается, это приводит к дублированию записи в БД из-за незафиксированного смещения.

Я хочу избежать этих дубликатов, но при этом убедиться, что обрабатываю каждое сообщение. Как правильно это сделать?

РЕДАКТИРОВАТЬ: Мое обновление БД в основном увеличивает счетчик записи на некоторое значение. Таким образом, операторы MERGE не подходят.


person The Kat    schedule 24.11.2020    source источник
comment
Для РСУБД используйте оператор MERGE. Для баз данных nosql, таких как Cassandra, повторяющаяся строка с тем же первичным ключом будет просто перезаписана без каких-либо ошибок.   -  person Saptarshi Basu    schedule 24.11.2020
comment
@SaptarshiBasu Спасибо за ответ. Обновили свой вопрос, чтобы показать, почему эти варианты не подходят для меня.   -  person The Kat    schedule 24.11.2020


Ответы (1)


Это немного сложно.

Kafka поддерживает семантику «ровно один раз». Но когда вы записываете данные во внешнее хранилище данных, вам необходимо обеспечить единовременную запись на стороне потребителя.

Один из способов добиться этого (предложенный Джеем Крепсом здесь) - поддерживать смещение Kafka в хранилище данных как часть одной транзакции. Следовательно, если вы сохраняете последнее смещение для каждого раздела, вы всегда можете игнорировать сообщения из данного раздела, когда получаете смещение меньше, чем то, которое хранится в БД.

Однако в этом подходе есть один нюанс. Если у вас есть развертывание «активный-активный» с несколькими центрами обработки данных, где потребители отключаются к другому кластеру центра обработки данных, если основной кластер выходит из строя, вы не можете слепо полагаться на смещение. Смещение - это физический идентификатор, и смещение сообщения в одном кластере может отличаться от смещения реплицированного сообщения в другом кластере.

В этих обстоятельствах я думаю, что правильным подходом будет использование потоков Kafka и поддержание счетчиков в таблице Kafka (KTable), хранящейся в сжатой теме Kafka. Kafka внутренне будет использовать идентификатор производителя, эпоху, идентификатор транзакции и т. Д., Чтобы обеспечить семантику ровно один раз.

person Saptarshi Basu    schedule 24.11.2020