при попытке передать данные Avro с помощью Kafka Streams я столкнулся с этой ошибкой:
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Несмотря на то, что я нашел несколько старых тем об этом в списке рассылки, ни одно из заявленных там решений не решило проблему. Так что, надеюсь, я смогу найти решение здесь.
Моя установка выглядит следующим образом:
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)
Я уже пытался установить KEY_SERDE
таким же, как VALUE_SERDE
, но даже несмотря на то, что это было "помечено" как решение в списке рассылки, в моем случае это не сработало.
Я генерирую GenericData.Record
со своей схемой следующим образом:
val record = new GenericData.Record(schema)
...
record.put(field, value)
Когда я запускаю режим отладки и проверяю сгенерированную запись, все выглядит нормально, в записи есть данные и сопоставление правильное.
Я транслирую KStream следующим образом (раньше я использовал ветку):
splitTopics.get(0).to(s"${destTopic}_Testing")
Я использую GenericData.Record
для записей. Может ли это быть проблемой в сочетании с GenericAvroSerde
?
GenericData.Record
записями. Выходная тема будет содержать данные Avro. Я могу читать сообщения из темы ввода с потребителем консоли. Когда я запускаю его в режиме отладки, я также могу видеть записи до того, как они будут отправлены, с вызовомstreams.to
, являющимся записями Avro. Это одна входная тема, но несколько выходных тем (на данный момент 4). Выходные темы еще не существуют - person Tim.G.   schedule 26.12.2017KEY_SERDE_CLASS_CONFIG
иVALUE_SERDE_CLASS_CONFIG
в сочетании сGenericAvroSerde
отсюда: github.com/confluentinc/schema-registry/blob/master/avro-serde/ - person Tim.G.   schedule 27.12.2017