KSQL UDF: DataException: схемы Struct не совпадают

Я пытаюсь создать поток как select (CSAS), поток создается успешно, но когда я пытался отправить сообщения, я получаю следующее исключение.

Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:247)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:116)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:93)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)

Ниже приведены основной поток, постоянный поток и детали функции udf из ksql-cli, не знаю, почему схема несовместима, как вы можете видеть ниже, в потоке processed есть поле с именем article со схемой точно так же, как и в возвращенное значение из функции UDF, я что-то упустил.

ksql> create stream main_stream ( article struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR> ) with  (KAFKA_TOPIC='articles', value_format='JSON');

ksql> create stream processed as select  test(article) article from main_stream;

ksql> describe processed;

Name                 : processed
 Field   | Type
-------------------------------------------------------------------------------------------------------------------------------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 ARTICLE | STRUCT<_ID VARCHAR(STRING), RAW_TITLE VARCHAR(STRING), RAW_TEXT VARCHAR(STRING), PROCESSED_TITLE VARCHAR(STRING), PROCESSED_TEXT VARCHAR(STRING)>
-------------------------------------------------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;


ksql> show queries;

 Query ID      | Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------------------------------------------
 CSAS_processed_20 | processed       | CREATE STREAM processed WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'processed') AS SELECT TEST(MAIN_STREAM.ARTICLE) "ARTICLE"
FROM MAIN_STREAM MAIN_STREAM;
--------------------------------------------------------------------------------------------------------------------------------------------------------------

ksql> describe function test;

Name        : TEST
Overview    : test udf
Type        : scalar
Jar         : /Users/ktawfik/libs/custom-udf.jar
Variations  :

    Variation   : TEST(article STRUCT<_ID VARCHAR, TITLE VARCHAR, TEXT VARCHAR, ACTION VARCHAR, URL VARCHAR, FEED_ID VARCHAR, MODE VARCHAR, SCORE INT, PUBLISHED_AT VARCHAR, RETRIEVED_AT VARCHAR>)
    Returns     : STRUCT<_ID VARCHAR, RAW_TITLE VARCHAR, RAW_TEXT VARCHAR, PROCESSED_TITLE VARCHAR, PROCESSED_TEXT VARCHAR>
    Description : test
    article     : A complete article object

Также под кодом UDF, который я использовал

@Udf(description = "test",
                    schema = "struct< _id VARCHAR, raw_title VARCHAR, raw_text VARCHAR, processed_title VARCHAR, processed_text VARCHAR>")
    public Struct processDocument(
            @UdfParameter(
                    schema = "struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR>",
                    value = "article",
                    description = "A complete article object") Struct struct) {

        Schema ARTICLE_SCHEMA = SchemaBuilder.struct()
                .field("_id", Schema.STRING_SCHEMA)
                .field("raw_title", Schema.STRING_SCHEMA)
                .field("raw_text", Schema.STRING_SCHEMA)
                .field("processed_title", Schema.STRING_SCHEMA)
                .field("processed_text", Schema.STRING_SCHEMA)
                .build();


Struct proStruct = new Struct(ARTICLE_SCHEMA);
        proStruct.put("_id", "1234");
        proStruct.put("raw_title", "RAW_TITLE___1234");
        proStruct.put("raw_text", "RAW_TEXT___1234");
        proStruct.put("processed_title", "TITLE____1234");
        proStruct.put("processed_text", "TEXT____1234");
        System.out.println(proStruct);
// Struct{_id=1234,raw_title=RAW_TITLE___1234,raw_text=RAW_TEXT___1234,processed_title=TITLE____1234,processed_text=TEXT____1234}
        return proStruct;

    }

person Karim Tawfik    schedule 10.11.2019    source источник


Ответы (2)


Я пытался решить проблему таким же образом, но я борюсь со следующим случаем:

UDF:

@UdfDescription(name = "ValueUnpacker", description = "..")
public class ValueUnpacker {
    private Schema valueSchema = SchemaBuilder.struct()
            .field("LABEL", Schema.INT32_SCHEMA)
            .build();


    @Udf(description = "Test a string", schema = "struct<LABEL INT>")
    public Struct unpackValue(@UdfParameter(value = "thingType", description = "a thing type") String thingType) {
        Struct ret = new Struct(valueSchema);
        int i = 5;
        ret.put("LABEL", i);
        System.out.println("Ret: " + ret);
        return ret;
    }
}

Запускаем ksqldb и набираем:

ksql> SELECT valueunpacker('test') FROM SOME_STREAM EMIT CHANGES;
|{LABEL=5}
|{LABEL=5}  

и он отлично работает. Но создавая поток как

CREATE STREAM CONSTANT_STREAM AS SELECT valueunpacker('test') FROM SOME_STREAM;

происходит сбой без вывода потока. Журнал ksqldb приводит к той же проблеме: «Схемы не совпадают».

person sfrehse    schedule 25.02.2020
comment
Проблему можно решить, сделав все поля необязательными, как требуется в ksqldb. Это означает, private Schema valueSchema = SchemaBuilder.struct() .field("LABEL", Schema.OPTIONAL_INT32_SCHEMA).optional .build(); - person sfrehse; 26.02.2020
comment
Этот комментарий является фактическим ответом, и его следует отредактировать в ответе выше. Также не следует использовать какие-либо другие функции определения схемы, например name (), version (). - person Simon; 10.04.2020

Мне удалось выяснить проблему и решить ее, в основном это тот факт, что движок KSQL переводит поля схемы в ВЕРХНИЙ регистр, поэтому, когда я отправляю поля с нижним регистром, он не мог сопоставить его, что неясно в документы.

Исправление в том, что у меня должны быть:

  1. Все поля в программно определенной SCHEMA в верхнем регистре, А ТАКЖЕ поле схемы в аннотации @UDF.
  2. Все поля в программно определенной схеме должны точно соответствовать всем полям (именам и типам) в поле схемы в аннотации @UDF.

Код, наконец, выглядел так:

@Udf(description = "test",
                    schema = "struct< _ID VARCHAR, RAW_TITLE VARCHAR, RAW_TEXT VARCHAR, PROCESSED_TITLE VARCHAR, PROCESSED_TEXT VARCHAR>")
    public Struct processDocument(
            @UdfParameter(
                    schema = "struct< _id VARCHAR, title VARCHAR, text VARCHAR, action VARCHAR, url VARCHAR, feed_id VARCHAR, mode VARCHAR, score INTEGER, published_at VARCHAR, retrieved_at VARCHAR>",
                    value = "article",
                    description = "A complete article object") Struct struct) {

        Schema ARTICLE_SCHEMA = SchemaBuilder.struct()
                .field("_ID", Schema.STRING_SCHEMA)
                .field("RAW_TITLE", Schema.STRING_SCHEMA)
                .field("RAW_TEXT", Schema.STRING_SCHEMA)
                .field("PROCESSED_TITLE", Schema.STRING_SCHEMA)
                .field("PROCESSED_TEXT", Schema.STRING_SCHEMA)
                .build();


Struct proStruct = new Struct(ARTICLE_SCHEMA);
        proStruct.put("_ID", "1234");
        proStruct.put("RAW_TITLE", "RAW_TITLE___1234");
        proStruct.put("RAW_TEXT", "RAW_TEXT___1234");
        proStruct.put("PROCESSED_TITLE", "TITLE____1234");
        proStruct.put("PROCESSED_TEXT", "TEXT____1234");
        System.out.println(proStruct);
        return proStruct;

    }
person Karim Tawfik    schedule 10.11.2019
comment
Так я не мог решить эту проблему. Пробовал несколько вариантов с прописными буквами. Однако я пытаюсь вернуть List<Struct>, но по-прежнему возникают Schemas do not match ошибки. - person sfrehse; 24.02.2020
comment
Не уверен в возвращении списка Struct, попробуйте сначала вернуть одну запись - решение должно работать - можете ли вы поделиться тем, что вы добавили в @UDF и возвращение - person Karim Tawfik; 25.02.2020