Kafka Streams — Неизвестный магический байт на GenericAvroSerde

при попытке передать данные Avro с помощью Kafka Streams я столкнулся с этой ошибкой:

Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Несмотря на то, что я нашел несколько старых тем об этом в списке рассылки, ни одно из заявленных там решений не решило проблему. Так что, надеюсь, я смогу найти решение здесь.

Моя установка выглядит следующим образом:

StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]   
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)  

Я уже пытался установить KEY_SERDE таким же, как VALUE_SERDE, но даже несмотря на то, что это было "помечено" как решение в списке рассылки, в моем случае это не сработало.

Я генерирую GenericData.Record со своей схемой следующим образом:

val record = new GenericData.Record(schema)
...
record.put(field, value)

Когда я запускаю режим отладки и проверяю сгенерированную запись, все выглядит нормально, в записи есть данные и сопоставление правильное.

Я транслирую KStream следующим образом (раньше я использовал ветку):

splitTopics.get(0).to(s"${destTopic}_Testing")

Я использую GenericData.Record для записей. Может ли это быть проблемой в сочетании с GenericAvroSerde?


person Tim.G.    schedule 26.12.2017    source источник
comment
Что не так с общей настройкой? Одна тема? Можете ли вы прочитать данные с потребителем консоли? Возможно, этот пример поможет: github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/   -  person Matthias J. Sax    schedule 26.12.2017
comment
Входная тема имеет необработанный текст (анализируется в Kafka Streams), а затем сопоставляется с GenericData.Record записями. Выходная тема будет содержать данные Avro. Я могу читать сообщения из темы ввода с потребителем консоли. Когда я запускаю его в режиме отладки, я также могу видеть записи до того, как они будут отправлены, с вызовом streams.to, являющимся записями Avro. Это одна входная тема, но несколько выходных тем (на данный момент 4). Выходные темы еще не существуют   -  person Tim.G.    schedule 26.12.2017
comment
Магический байт — это байт, который добавляется слитным клиентом в качестве своего рода маркера перед сериализованным сообщением с использованием Avro. Эта ошибка может означать, что вы пытаетесь десериализовать какое-то сообщение с помощью клиента Confluent, но сообщение не было сериализовано с помощью Avro клиентом Confluent. Вы смешиваете совмещенные и ванильные клиенты Kafka?   -  person Luciano Afranllie    schedule 26.12.2017
comment
@Luciano Afranllie Как я могу проверить это наилучшим образом?   -  person Tim.G.    schedule 26.12.2017
comment
На самом деле это не клиент Confluent, что я имел в виду, я хотел сказать Confluent KafkaAvroSerializer. Проверьте, настроены ли ваши производители на использование этого сериализатора значений, как в этот пример   -  person Luciano Afranllie    schedule 27.12.2017
comment
Ааа, да я. Но помимо примеров я использую параметры KEY_SERDE_CLASS_CONFIG и VALUE_SERDE_CLASS_CONFIG в сочетании с GenericAvroSerde отсюда: github.com/confluentinc/schema-registry/blob/master/avro-serde/   -  person Tim.G.    schedule 27.12.2017
comment
Я думаю, что нашел проблему... Спасибо, что спросили о сериализаторе... Я понял, что пытаюсь десериализовать текст с помощью Avro при использовании Serde... Не просто сериализую.   -  person Tim.G.    schedule 27.12.2017


Ответы (1)


Решение моей проблемы состояло в том, чтобы обменять VALUE_SERDE после десериализации значения String, которое я получаю из моей темы ввода.

Поскольку Serde является комбинированным «элементом» сериализации и десериализации, я не могу просто использовать StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde], но должен использовать StringSerde для десериализации входных записей и только затем использовать AvroSerde для записи в выходной раздел.
Выглядит так в настоящее время:

// default streams configuration serdes are different from the actual output configurations
val streamsConfiguration: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamsConf.getString("APPLICATION_ID"))
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kStreamsConf.getString("BOOTSTRAP_SERVERS_CONFIG"))
  p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kStreamsConf.getString("AUTO_OFFSET_RESET_CONFIG"))
  p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kStreamsConf.getString("SCHEMA_REGISTRY_URL_CONFIG"))
  p
}

// adjusted output serdes for avro records
val keySerde: Serde[String] = Serdes.String
val valSerde: Serde[GenericData.Record] = new GenericAvroSerde()
valSerde.configure(
  Collections.singletonMap(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
    streamsConfiguration.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)
  ),
  /* isKeySerde = */ false
)

// Now using the adjusted serdes to write to output like this
stream.to(keySerde, valSerde, "destTopic")

Таким образом, это работает как шарм.
Спасибо.

person Tim.G.    schedule 27.12.2017