KSQLDB: выберите поля как массив

Я пытаюсь преобразовать данные из одной темы (тема чтения) в другую (тему записи) через KsqlDb.

Это данные, которые были созданы для чтения по теме

{
  "orderNumber": "01235656",
  "deliveryBarcode": "733998877",
  "requestId": "1616516663000",
  "status": "APPROVED_BY_SUPERVISOR"
}

Я написал эти запросы ksqldb:

-- The general stream to read the topic is like this:
CREATE STREAM GENERAL_STREAM (
    deliveryBarcode VARCHAR,
    orderNumber VARCHAR,
    requestId VARCHAR,
    status VARCHAR
) WITH (
    kafka_topic = 'read-topic',
    value_format = 'json'
);


-- This is the stream to redirect the filtered data throgh 'write-topic'
CREATE STREAM REDIRECTION_STREAM
WITH (
    partitions = 6,
    replicas = 3,
    kafka_topic = 'write-topic',
    value_format = 'json'
) AS
SELECT
       AS_VALUE(requestId) `requestId`,
       ARRAY<STRUCT<
        deliveryBarcode 
        orderNumber
       >> `packages`
FROM EARTH_DELIVERY_COURIER_PUDOPACKAGESTATUSUPDATED_0
WHERE (payload -> status = 'APPROVED_BY_SUPERVISOR')
EMIT CHANGES;

Но мой запрос не работает из-за этой части:

ARRAY<STRUCT<
        deliveryBarcode 
        orderNumber
       >> `packages`

Мои ожидаемые данные по теме записи выглядят так

{
  "requestId": "1616516663000"
  "packages":[
    {
      "ordernumber":"01235656",
      "barcodenumber":"733998877"
    }
  ]
}

Как мне изменить эти запросы, чтобы иметь возможность создавать поле «пакеты» в формате массива, как ожидалось?


person Sinan Türemiş    schedule 24.03.2021    source источник


Ответы (1)


Вы можете поиграть с функциями array () и as_map (), чтобы сгенерировать ожидаемый результат.

Вот CSAS, который я использовал для этого трюка:

CREATE STREAM REDIRECTION_STREAM 
WITH (kafka_topic='write_topic', value_format='json') 
AS SELECT 
  AS_VALUE(requestId) requestId, 
  ARRAY[
    AS_MAP(
      ARRAY['deliverybarcode', 'ordernumber'], 
      ARRAY[deliverybarcode, ordernumber]
    )
  ] packages 
FROM GENERAL_STREAM;

Вот результат вышеупомянутой темы

print 'write_topic' from BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2021/03/24 13:24:44.557 Z, key: <null>, value: {"REQUESTID":"1616516663000","PACKAGES":[{"ordernumber":"01235656","deliverybarcode":"733998877"}]}, partition: 0
^CTopic 
person Sergio    schedule 24.03.2021
comment
Отличное решение Серхио. Оно работает. Спасибо! - person Sinan Türemiş; 24.03.2021