Kafka Avro Consumer с проблемами декодера

Когда я попытался запустить Kafka Consumer с Avro над данными с моей соответствующей схемой, он вернул ошибка «AvroRuntimeException: неверные данные. Длина отрицательная: -40». Я вижу, что у других были похожие проблемы преобразование массива байтов в json, Avro пишет и читает и Kafka Avro Binary * coder. Я также сослался на этот пример группы потребителей, в котором есть все был полезен, однако пока не помог с этой ошибкой .. Он работает до этой части кода (строка 73)

Декодер decoder = DecoderFactory.get (). BinaryDecoder (byteArrayInputStream, null);

Я пробовал другие декодеры и распечатал содержимое переменной byteArrayInputStream, которая выглядит так, как я считаю, вы ожидаете, что сериализованные данные avro будут выглядеть (в сообщении я вижу схему, некоторые данные и некоторые искаженные данные). Байт доступно с использованием метода .available (), который возвращает 594. Мне трудно понять, почему возникает эта ошибка. Apache Nifi используется для создания потока Kafka с той же схемой из hdfs. Буду признателен за любую помощь.


person SparkleGoat    schedule 15.03.2016    source источник


Ответы (1)


Возможно, проблема заключается в несоответствии между тем, как данные Avro записываются (кодируются) Nifi, и тем, как ваше потребительское приложение считывает (декодирует) данные.

Вкратце, API Avro предоставляет два разных подхода к сериализации:

  1. Для создания правильных файлов Avro: для кодирования записей данных, а также для встраивания схемы Avro в своего рода преамбулу (через org.apache.avro.file.{DataFileWriter/DataFileReader}). Встраивание схемы в файлы Avro имеет большой смысл, потому что (а) обычно «полезная нагрузка» файлов Avro на порядки больше, чем встроенная схема Avro, и (б) вы можете затем копировать или перемещать эти файлы по своему усмотрению. и, тем не менее, будьте уверены, что сможете прочитать их снова, не консультируясь ни с кем или с чем-то.
  2. Кодировать только записи данных, т.е. не встраивать схему (через org.apache.avro.io.{BinaryEncoder/BinaryDecoder}; обратите внимание на разницу в имени пакета: io здесь и file выше). Этот подход часто предпочитается, когда сообщения Avro-кодирования, которые записываются в тему Kafka, например, потому что по сравнению с вариантом 1 выше вы не несете накладных расходов на повторное встраивание схемы Avro в каждое отдельное сообщение, предполагая, что ваш (очень разумная) политика заключается в том, что для одной и той же темы Kafka сообщения форматируются / кодируются с использованием одной и той же схемы Avro. Это значительное преимущество, потому что в контексте потоковых данных запись данных в движении обычно намного меньше (обычно от 100 байт до нескольких сотен КБ), чем файлы Avro с данными в состоянии покоя, как описано выше (часто сотни или тысячи МБ); поэтому размер схемы Avro относительно велик, и поэтому вы не хотите встраивать ее 2000x при записи 2000 записей данных в Kafka. Недостатком является то, что вы должны «каким-то образом» отслеживать, как схемы Avro сопоставляются с темами Kafka - или, точнее, вы должны каким-то образом отслеживать, с помощью какой схемы Avro было закодировано сообщение, не переходя по пути непосредственного встраивания схемы. Хорошая новость заключается в том, что в экосистеме Kafka (реестр схем Avro) есть инструменты для этого. прозрачно. Таким образом, по сравнению с вариантом 1, вариант 2 выигрывает в эффективности за счет удобства.

Эффект заключается в том, что «формат провода» для закодированных данных Avro будет выглядеть по-разному в зависимости от того, используете ли вы (1) или (2) выше.

Я не очень хорошо знаком с Apache Nifi, но беглый взгляд на исходный код (например, ConvertAvroToJSON.java) подсказывает мне, что он использует вариант 1, т.е. он встраивает схему Avro вместе с записями Avro. Однако ваш потребительский код использует DecoderFactory.get().binaryDecoder() и, следовательно, вариант 2 (без встроенной схемы).

Возможно, это объясняет ошибку, с которой вы столкнулись?

person Michael G. Noll    schedule 16.03.2016
comment
СПАСИБО, @miguno, именно так! Я качаюсь и перекатываюсь с помощью Decoder на DataFileReader с двумя изменениями строки. DatumReader ‹GenericRecord› datumReader = новый SpecificDatumReader ‹GenericRecord› (схема); DataFileStream ‹GenericRecord› dataFileReader = новый DataFileStream ‹GenericRecord› (inputStream, datumReader); - person SparkleGoat; 16.03.2016
comment
Исправление * Я качаюсь и катаюсь сейчас, когда я перешел на DataFileReader с двумя изменениями строки. Вы правы, binaryDecoder не подошел для этой работы. - person SparkleGoat; 16.03.2016