Я пытаюсь отправить объект в Kafka с помощью сериализатора Avro и реестра схем.
Вот упрощенный код:
Properties props = new Properties();
...
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistryHostname + ":8081");
Producer<String, User> producer = new KafkaProducer(properties);
User user = new User("name", "address", 123);
ProducerRecord record = new ProducerRecord<>(topic, key, user);
producer.send(record);
Я предполагал, что схема считывается «за кулисами» из реестра, а объект (пользователь) сериализуется, но я получаю сообщение об ошибке ниже.
Что мне не хватает?
Должен ли я читать схему явно и отправить GenericRecord?
org.apache.kafka.common.errors.SerializationException: ошибка сериализации сообщения Avro
Причина: java.lang.IllegalArgumentException: неподдерживаемый тип Avro. Поддерживаемые типы: null, Boolean, Integer, Long, Float, Double, String, byte [] и IndexedRecord
на io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema (AbstractKafkaAvroSerDe.java:123) ~ [kafka-avro- serializer-3.3.0.jar! / :?]
в io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl (AbstractKafkaAvroSerializer.java:73) ~ [kafka-avro-serializer-3.3.0.jar! /: ?]
в io.confluent.kafka.serializers.KafkaAvroSerializer.serialize (KafkaAvroSerializer.java:53) ~ [kafka-avro-serializer-3.3.0.jar! / :?]
в org.apache. kafka.clients.producer.KafkaProducer.send (KafkaProducer.java:424) ~ [kafka-clients-0.9.0.1.jar! / :?]