Есть способ решить эту проблему. Уловка состоит в том, чтобы использовать формат значения KAFKA
для записи надгробия в основную тему.
Вот пример, использующий ваш исходный DDL.
-- Insert a second row of data
INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (42, 'Life of Brian', 1986);
-- Query table
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> select * from movies emit changes limit 2;
+--------------------------------+--------------------------------+--------------------------------+
|TITLE |ID |RELEASE_YEAR |
+--------------------------------+--------------------------------+--------------------------------+
|Life of Brian |42 |1986 |
|Aliens |48 |1986 |
Limit Reached
Query terminated
Теперь объявите новый поток, который будет писать в ту же тему Kafka, используя тот же ключ:
CREATE STREAM MOVIES_DELETED (title VARCHAR KEY, DUMMY VARCHAR)
WITH (KAFKA_TOPIC='movies',
VALUE_FORMAT='KAFKA');
Вставьте сообщение-надгробие:
INSERT INTO MOVIES_DELETED (TITLE,DUMMY) VALUES ('Aliens',CAST(NULL AS VARCHAR));
Снова запросите таблицу:
ksql> select * from movies emit changes limit 2;
+--------------------------------+--------------------------------+--------------------------------+
|TITLE |ID |RELEASE_YEAR |
+--------------------------------+--------------------------------+--------------------------------+
|Life of Brian |42 |1986 |
Изучите основную тему
ksql> print movies;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/02/22 11:01:05.966 Z, key: Aliens, value: {"ID":48,"RELEASE_YEAR":1986}, partition: 0
rowtime: 2021/02/22 11:02:00.194 Z, key: Life of Brian, value: {"ID":42,"RELEASE_YEAR":1986}, partition: 0
rowtime: 2021/02/22 11:04:52.569 Z, key: Aliens, value: <null>, partition: 0
person
Robin Moffatt
schedule
22.02.2021