У нас есть платформа для микросервисов, и мы используем debezium для сбора данных об изменениях из баз данных на этих платформах, что хорошо работает.
Теперь мы хотели бы упростить для нас присоединение к этим темам и передачу результатов в новую тему, которая могла бы использоваться несколькими службами.
Отказ от ответственности: это предполагает v0.11 ksqldb и cli (похоже, что многое из этого может не работать в более старых версиях)
Пример двух таблиц из двух экземпляров базы данных, которые транслируются в темы Kafka:
-- source identity microservice (postgres)
CREATE TABLE public.user_entity (
id varchar(36) NOT NULL,
first_name varchar(255) NULL,
PRIMARY KEY (id)
);
-- ksql stream
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');
-- source organization microservice (postgres)
CREATE TABLE public.user_info (
id varchar(36) NOT NULL,
user_entity_id varchar(36) NOT NULL,
business_unit varchar(255) NOT NULL,
cost_center varchar(255) NOT NULL,
PRIMARY KEY (id)
);
-- ksql stream
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');
Вариант 1: потоки
CREATE STREAM stream_user_info_by_user_entity_id
AS SELECT * FROM stream_user_info
PARTITION BY user_entity_id
EMIT CHANGES;
SELECT
user_entity_id,
first_name,
business_unit,
cost_center
FROM stream_user_entity ue
LEFT JOIN stream_user_info_by_user_entity_id ui WITHIN 365 DAYS ON ue.id = ui.user_entity_id
EMIT CHANGES;
Заметьте WITHIN 365 DAYS
, концептуально эти таблицы могут оставаться без изменений очень долгое время, поэтому это окно будет технически бесконечно большим. Это выглядит подозрительно и, кажется, намекает на то, что это не лучший способ сделать это.
Вариант 2: таблицы
CREATE TABLE ktable_user_info_by_user_entity_id (
user_entity_id,
first_name,
business_unit,
cost_center
)
with (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');
SELECT
user_entity_id,
first_name,
business_unit,
cost_center
FROM stream_user_entity ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON ue.id = ui.user_entity_id
EMIT CHANGES;
Нам больше не нужно окно WITHIN 365 DAYS
, так что это кажется более правильным. Однако это вызывает изменение только тогда, когда сообщение отправляется в поток, а не в таблицу.
В этом примере: Пользователь обновляет first_name - ›изменение отправлено Пользовательские обновления business_unit -› изменения не отправлены
Возможно, есть способ создать объединенный поток, разделенный user_entity_id, и присоединиться к дочерним таблицам, которые будут содержать текущее состояние, что приводит меня к ....
Вариант 3: объединение потока и таблиц
-- "master" change stream with merged stream output
CREATE STREAM stream_user_changes (user_entity_id VARCHAR)
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;
CREATE STREAM stream_user_entity_by_id
AS SELECT * FROM stream_user_entity
PARTITION BY id
EMIT CHANGES;
CREATE TABLE ktable_user_entity_by_id (
id VARCHAR PRIMARY KEY,
first_name VARCHAR
) with (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');
SELECT
uec.user_entity_id,
ue.first_name,
ui.business_unit,
ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id
EMIT CHANGES;
Этот выглядит лучше всего, но, похоже, в нем много движущихся компонентов для каждой таблицы, у нас есть 2 потока, 1 запрос на вставку, 1 ktable. Другой потенциальной проблемой здесь может быть состояние скрытой гонки, когда поток испускает изменение до того, как таблица будет обновлена под обложками.
Вариант 4: больше объединенных таблиц и потоков
CREATE STREAM stream_user_entity_changes_enriched
AS SELECT
ue.id AS user_entity_id,
ue.first_name,
ui.business_unit,
ui.cost_center
FROM stream_user_entity_by_id ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id
EMIT CHANGES;
CREATE STREAM stream_user_info_changes_enriched
AS SELECT
ui.user_entity_id,
ue.first_name,
ui.business_unit,
ui.cost_center
FROM stream_user_info_by_user_entity_id ui
LEFT JOIN ktable_user_entity_by_id ue ON ui.user_entity_id = ue.id
EMIT CHANGES;
CREATE STREAM stream_user_changes_enriched (user_entity_id VARCHAR, first_name VARCHAR, business_unit VARCHAR, cost_center VARCHAR)
WITH (KAFKA_TOPIC='stream_user_changes_enriched', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_entity_changes_enriched;
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_info_changes_enriched;
Концептуально это то же самое, что и предыдущее, но слияние происходит после объединений. Вероятно, это могло бы устранить любые потенциальные условия гонки, потому что мы выбираем в первую очередь из потоков, а не из таблиц.
Обратной стороной является то, что сложность даже хуже, чем в варианте 3, и запись и отслеживание всех этих потоков для любых объединений с более чем двумя таблицами было бы ошеломляющим ...
Вопрос: какой метод лучше всего подходит для этого варианта использования и / или мы пытаемся сделать что-то, для чего не следует использовать ksql? Может нам лучше просто переложить это на традиционные РСУБД или зажечь альтернативы?
the new information from the table
. Я не уверен, каким должно быть ожидаемое поведение, если изменение в таблице вызывает событие немедленно, в случае, если изменение является обновлением существующей строки, нам может потребоваться вернуться к старым событиям в выходном потоке для обновления, но поскольку поток неизменен, этого не должно происходить. - person Khanh TO   schedule 27.09.2020