Я пытаюсь преобразовать данные из одной темы (тема чтения) в другую (тему записи) через KsqlDb.
Это данные, которые были созданы для чтения по теме
{
"orderNumber": "01235656",
"deliveryBarcode": "733998877",
"requestId": "1616516663000",
"status": "APPROVED_BY_SUPERVISOR"
}
Я написал эти запросы ksqldb:
-- The general stream to read the topic is like this:
CREATE STREAM GENERAL_STREAM (
deliveryBarcode VARCHAR,
orderNumber VARCHAR,
requestId VARCHAR,
status VARCHAR
) WITH (
kafka_topic = 'read-topic',
value_format = 'json'
);
-- This is the stream to redirect the filtered data throgh 'write-topic'
CREATE STREAM REDIRECTION_STREAM
WITH (
partitions = 6,
replicas = 3,
kafka_topic = 'write-topic',
value_format = 'json'
) AS
SELECT
AS_VALUE(requestId) `requestId`,
ARRAY<STRUCT<
deliveryBarcode
orderNumber
>> `packages`
FROM EARTH_DELIVERY_COURIER_PUDOPACKAGESTATUSUPDATED_0
WHERE (payload -> status = 'APPROVED_BY_SUPERVISOR')
EMIT CHANGES;
Но мой запрос не работает из-за этой части:
ARRAY<STRUCT<
deliveryBarcode
orderNumber
>> `packages`
Мои ожидаемые данные по теме записи выглядят так
{
"requestId": "1616516663000"
"packages":[
{
"ordernumber":"01235656",
"barcodenumber":"733998877"
}
]
}
Как мне изменить эти запросы, чтобы иметь возможность создавать поле «пакеты» в формате массива, как ожидалось?