Как удалить значение из таблицы ksqldb или вставить значение захоронения?

Как можно пометить строку в таблице ksql для удаления через Rest api или хотя бы как инструкцию в ksqldb-cli?

CREATE TABLE movies (
      title VARCHAR PRIMARY KEY,
      id INT,
      release_year INT
    ) WITH (
      KAFKA_TOPIC='movies',
      PARTITIONS=1,
      VALUE_FORMAT = 'JSON'
    );

INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (48, 'Aliens', 1986);

Это не работает по очевидным причинам, но оператор DELETE не существует в ksqldb:

INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (48, null, null);

Есть ли способ создать рекомендуемое нулевое значение надгробной плиты или мне нужно записать его непосредственно в основную тему?


person Kubus    schedule 21.02.2021    source источник


Ответы (1)


Есть способ решить эту проблему. Уловка состоит в том, чтобы использовать формат значения KAFKA для записи надгробия в основную тему.

Вот пример, использующий ваш исходный DDL.

-- Insert a second row of data
INSERT INTO MOVIES (ID, TITLE, RELEASE_YEAR) VALUES (42, 'Life of Brian', 1986);

-- Query table
ksql> SET 'auto.offset.reset' = 'earliest';

ksql> select * from movies emit changes limit 2;
+--------------------------------+--------------------------------+--------------------------------+
|TITLE                           |ID                              |RELEASE_YEAR                    |
+--------------------------------+--------------------------------+--------------------------------+
|Life of Brian                   |42                              |1986                            |
|Aliens                          |48                              |1986                            |
Limit Reached
Query terminated

Теперь объявите новый поток, который будет писать в ту же тему Kafka, используя тот же ключ:

CREATE STREAM MOVIES_DELETED (title VARCHAR KEY, DUMMY VARCHAR) 
  WITH (KAFKA_TOPIC='movies', 
       VALUE_FORMAT='KAFKA');

Вставьте сообщение-надгробие:

INSERT INTO MOVIES_DELETED (TITLE,DUMMY) VALUES ('Aliens',CAST(NULL AS VARCHAR));

Снова запросите таблицу:

ksql> select * from movies emit changes limit 2;
+--------------------------------+--------------------------------+--------------------------------+
|TITLE                           |ID                              |RELEASE_YEAR                    |
+--------------------------------+--------------------------------+--------------------------------+
|Life of Brian                   |42                              |1986                            |

Изучите основную тему

ksql> print movies;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/02/22 11:01:05.966 Z, key: Aliens, value: {"ID":48,"RELEASE_YEAR":1986}, partition: 0
rowtime: 2021/02/22 11:02:00.194 Z, key: Life of Brian, value: {"ID":42,"RELEASE_YEAR":1986}, partition: 0
rowtime: 2021/02/22 11:04:52.569 Z, key: Aliens, value: <null>, partition: 0
person Robin Moffatt    schedule 22.02.2021
comment
Спасибо, Робин, за обходной путь. Об этом следует сообщить команде ksqldb. Должно быть проще вставлять удаленные значения захоронения (без промежуточных потоков или форматов значений). Я не получил row.tombstone в ответном json-объекте в push-запрос таким образом. - person Kubus; 22.02.2021
comment
Не стесняйтесь проголосовать / пометить как правильный, если вы считаете, что это так;) Вы можете регистрировать проблемы и предлагать улучшения для ksqlDB здесь: github.com/confluentinc/ksql - person Robin Moffatt; 22.02.2021
comment
конечно, я проголосовал за вас, мне просто интересно, почему я не получил строку, похожую на эту {row: {columns: ['Alien', null, null], tombstone: true}} через Rest API, потому что это была моя оригинальная проблема :) В следующий раз я буду более многословен, но я ожидал, что это сработает, после того, как я добьюсь, чтобы вставить значение надгробной плиты. снова спасибо - person Kubus; 22.02.2021