Как отправить сообщение в Kafka с помощью сериализатора Avro и реестра схем

Я пытаюсь отправить объект в Kafka с помощью сериализатора Avro и реестра схем.
Вот упрощенный код:

    Properties props = new Properties();
    ...
    props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistryHostname + ":8081");

    Producer<String, User> producer = new KafkaProducer(properties);

    User user = new User("name", "address", 123);
    ProducerRecord record = new ProducerRecord<>(topic, key, user);
    producer.send(record);

Я предполагал, что схема считывается «за кулисами» из реестра, а объект (пользователь) сериализуется, но я получаю сообщение об ошибке ниже.
Что мне не хватает?
Должен ли я читать схему явно и отправить GenericRecord?

org.apache.kafka.common.errors.SerializationException: ошибка сериализации сообщения Avro
Причина: java.lang.IllegalArgumentException: неподдерживаемый тип Avro. Поддерживаемые типы: null, Boolean, Integer, Long, Float, Double, String, byte [] и IndexedRecord
на io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema (AbstractKafkaAvroSerDe.java:123) ~ [kafka-avro- serializer-3.3.0.jar! / :?]
в io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.java:73) ~ [kafka-avro-serializer-3.3.0.jar! /: ?]
в io.confluent.kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:53) ~ [kafka-avro-serializer-3.3.0.jar! / :?]
в org.apache. kafka.clients.producer.KafkaProducer.send (KafkaProducer.java:424) ~ [kafka-clients-0.9.0.1.jar! / :?]


person msayag    schedule 31.10.2017    source источник


Ответы (2)


Ваш код кажется правильным. Единственное, чего может не хватать, это то, что ваш объект AVRO не был должным образом сгенерирован с помощью какого-либо плагина AVRO, это означает, что ваш класс должен реализовывать SpecificRecords, который реализует IndexedRecord.

person hlagos    schedule 31.10.2017
comment
Верно. Кажется, все работает после того, как я создал правильный файл avsc и сгенерировал из него файл Java с помощью gradle-avro-plugin. Я предполагал, что схема создается Avro автоматически, с использованием рефлексии, я ошибался. - person msayag; 01.11.2017

Ваш код кажется правильным, U, должно быть, не создал правильную структуру из файла avsc с помощью mvn generate sources с maven (передайте эту команду на терминал в папке вашего проекта)

Затем он создаст bean-компонент, в котором u может передавать значения как

User order = User.newBuilder()
        .setName("xyz")
        .setAddress("CId432")
        .setPrice("123")
        .build();
person Tanmay Naik    schedule 07.06.2019