Лучший способ объединить две (или более) темы кафки в KSQL, генерируя изменения из всех тем?

У нас есть платформа для микросервисов, и мы используем debezium для сбора данных об изменениях из баз данных на этих платформах, что хорошо работает.

Теперь мы хотели бы упростить для нас присоединение к этим темам и передачу результатов в новую тему, которая могла бы использоваться несколькими службами.

Отказ от ответственности: это предполагает v0.11 ksqldb и cli (похоже, что многое из этого может не работать в более старых версиях)

Пример двух таблиц из двух экземпляров базы данных, которые транслируются в темы Kafka:

-- source identity microservice (postgres)
CREATE TABLE public.user_entity (
    id varchar(36) NOT NULL,
    first_name varchar(255) NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');

-- source organization microservice (postgres)
CREATE TABLE public.user_info (
    id varchar(36) NOT NULL,
    user_entity_id varchar(36) NOT NULL,
    business_unit varchar(255) NOT NULL,
    cost_center varchar(255) NOT NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

Вариант 1: потоки

CREATE STREAM stream_user_info_by_user_entity_id
AS SELECT * FROM stream_user_info
PARTITION BY user_entity_id
EMIT CHANGES;

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN stream_user_info_by_user_entity_id ui WITHIN 365 DAYS ON ue.id = ui.user_entity_id 
EMIT CHANGES;

Заметьте WITHIN 365 DAYS, концептуально эти таблицы могут оставаться без изменений очень долгое время, поэтому это окно будет технически бесконечно большим. Это выглядит подозрительно и, кажется, намекает на то, что это не лучший способ сделать это.

Вариант 2: таблицы

CREATE TABLE ktable_user_info_by_user_entity_id (
    user_entity_id,
    first_name,
    business_unit,
    cost_center
)
with (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON ue.id = ui.user_entity_id 
EMIT CHANGES;

Нам больше не нужно окно WITHIN 365 DAYS, так что это кажется более правильным. Однако это вызывает изменение только тогда, когда сообщение отправляется в поток, а не в таблицу.

В этом примере: Пользователь обновляет first_name - ›изменение отправлено Пользовательские обновления business_unit -› изменения не отправлены

Возможно, есть способ создать объединенный поток, разделенный user_entity_id, и присоединиться к дочерним таблицам, которые будут содержать текущее состояние, что приводит меня к ....

Вариант 3: объединение потока и таблиц

-- "master" change stream with merged stream output
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

CREATE STREAM stream_user_entity_by_id
AS SELECT * FROM stream_user_entity
PARTITION BY id
EMIT CHANGES;

CREATE TABLE ktable_user_entity_by_id (
    id VARCHAR PRIMARY KEY,
    first_name VARCHAR
) with (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');

SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

Этот выглядит лучше всего, но, похоже, в нем много движущихся компонентов для каждой таблицы, у нас есть 2 потока, 1 запрос на вставку, 1 ktable. Другой потенциальной проблемой здесь может быть состояние скрытой гонки, когда поток испускает изменение до того, как таблица будет обновлена ​​под обложками.

Вариант 4: больше объединенных таблиц и потоков

CREATE STREAM stream_user_entity_changes_enriched
AS SELECT 
    ue.id AS user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_by_id ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

CREATE STREAM stream_user_info_changes_enriched
AS SELECT 
    ui.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_info_by_user_entity_id ui
LEFT JOIN ktable_user_entity_by_id ue ON ui.user_entity_id = ue.id
EMIT CHANGES;


CREATE STREAM stream_user_changes_enriched (user_entity_id VARCHAR, first_name VARCHAR, business_unit VARCHAR, cost_center VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes_enriched', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_entity_changes_enriched;
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_info_changes_enriched;

Концептуально это то же самое, что и предыдущее, но слияние происходит после объединений. Вероятно, это могло бы устранить любые потенциальные условия гонки, потому что мы выбираем в первую очередь из потоков, а не из таблиц.

Обратной стороной является то, что сложность даже хуже, чем в варианте 3, и запись и отслеживание всех этих потоков для любых объединений с более чем двумя таблицами было бы ошеломляющим ...

Вопрос: какой метод лучше всего подходит для этого варианта использования и / или мы пытаемся сделать что-то, для чего не следует использовать ksql? Может нам лучше просто переложить это на традиционные РСУБД или зажечь альтернативы?


person Phillip Fleischer    schedule 23.09.2020    source источник
comment
Я думаю, что вариант 2 - это ожидаемое поведение. Хотя изменение в таблице не приводит к немедленному возникновению события, любые последующие изменения в потоке после этого будут вызывать события в потоке вывода с the new information from the table. Я не уверен, каким должно быть ожидаемое поведение, если изменение в таблице вызывает событие немедленно, в случае, если изменение является обновлением существующей строки, нам может потребоваться вернуться к старым событиям в выходном потоке для обновления, но поскольку поток неизменен, этого не должно происходить.   -  person Khanh TO    schedule 27.09.2020
comment
да, я согласен, что это ожидаемое поведение. Я ищу решение, которое по сути работает с этим поведением.   -  person Phillip Fleischer    schedule 30.09.2020
comment
Как вы думаете, таблица-таблица (ksql table) больше подходит в этом случае? То, что вы хотите, чтобы вывод здесь не звучал как поток.   -  person Khanh TO    schedule 01.10.2020
comment
Мне нужен поток, я хочу, чтобы другие приложения могли использовать поток и получать уведомления о любых обновлениях этих двух комбинированных потоков. Я могу присоединиться к ним, например, у потребителя, но если у меня есть несколько сервисов, которым необходимо знать об этом, было бы предпочтительнее иметь один объединенный поток на стороне kafka, и потребитель не должен знать задействованную логику .   -  person Phillip Fleischer    schedule 07.10.2020
comment
Объединенная таблица не является потоком в том смысле, что хранятся исторические записи. Если я прав, приложения, использующие объединенную таблицу, по-прежнему уведомляются о любых обновлениях двух потоков по последнему состоянию объединенных записей. Это похоже на то, что ты хочешь   -  person Khanh TO    schedule 10.10.2020


Ответы (1)


Я попытаюсь ответить на свой вопрос, примите его только в том случае, если за него проголосуют.

Ответ: Вариант 3.

Вот причины для этого варианта использования, это было бы лучше всего, хотя, возможно, это могло быть субъективным

  • Потоки, разделенные первичными и внешними ключами, обычны и просты.
  • Таблицы, основанные на этих потоках, общие и простые.
  • Таблицы, используемые таким образом, не будут являться условием гонки.

У всех вариантов есть свои достоинства, например: если вам не нужны все изменения или данные ведут себя как потоки (журналы или события), а не медленно меняющиеся измерения (таблицы sql).

Что касается условий гонки, таблица слов вводит вас в заблуждение, говоря, что вы на самом деле обрабатываете и сохраняете данные. На самом деле они не являются физическими таблицами, они на самом деле больше похожи на подзапросы в потоках. Примечание: это может быть исключением для таблиц агрегации, которые фактически создают темы (которые я бы предположил, это отдельная тема, но хотелось бы увидеть комментарии)

В конце (в синтаксисе могут быть небольшие ошибки):

---------------------------------------------------------
-- shared objects (likely to be used by multiple queries)
---------------------------------------------------------

-- shared streams wrapping topics
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

-- shared keyed streams (i like to think of them as "indexes")
CREATE STREAM stream_user_entity_by_id AS 
SELECT * FROM stream_user_entity PARTITION BY id
EMIT CHANGES;
CREATE STREAM stream_user_info_by_user_entity_id AS 
SELECT * FROM stream_user_info PARTITION BY user_entity_id
EMIT CHANGES;

-- shared keyed tables (inferring columns with schema registry)
CREATE TABLE ktable_user_entity_by_id (id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');
CREATE TABLE ktable_user_info_by_user_entity_id (user_entity_id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');


---------------------------------------------------------
-- query objects (specific to the produced data)
---------------------------------------------------------
-- "master" change stream (include all tables in join)
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

-- pretty simple looking query
SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

Общие объекты - это в основном схема потоковой передачи (соблазн создать для всех наших тем, но это другой вопрос), а вторая часть похожа на схему запроса. В конечном итоге это функциональный, чистый и повторяемый образец.

person Phillip Fleischer    schedule 27.09.2020