Confluent и Cassandra: получение исключения данных: не удалось десериализовать данные в Avro, неизвестный магический байт

Я следовал руководству из http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/, и я могу вставлять данные из консоли avro в cassandra. Теперь я пытаюсь расширить это, чтобы использовать flume, и у меня на моей машине настроен поток, который выбирает файл журнала и отправляет его в kafka, пытаясь вставить мои данные в базу данных cassandra. В текстовый файл я помещаю данные

{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2}

{"id": 2, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 99,5}

{"id": 3, "created": "2016-05-06 13:55:00", "product": "FU-DATAMOUNTAINEER-20150201-100", "price": 10000}

{"id": 4, "created": "2016-05-06 13:56:00", "product": "FU-KOSPI-C-20150201-100", "price": 150}

Flume собирает эти данные и отправляет их в kafka.

В раковине кассандры я столкнулся с ошибкой,

ОШИБКА Задача cassandra -ink-orders-0 вызвала неперехваченное и неустранимое исключение (org.apache.kafka.connect.runtime.WorkerTask: 142) org.apache.kafka.connect.errors.DataException: Не удалось десериализовать данные в Avro: at io.confluent.connect.avro.AvroConverter.toConnectData (AvroConverter.java:109) на org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages (WorkerSinkTask.java:346) на org.apache.kafka. WorkerSinkTask.poll (WorkerSinkTask.java:226) в org.apache.kafka.connect.runtime.WorkerSinkTask.iteration (WorkerSinkTask.java:170) в org.apache.kafka.connect.runtime.WorkerSinkTask.exe 142) на org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140) на org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175) на java.util.concurrent .Executors $ RunnableAdapter.call (Executors.java:511) в java.util.concurrent. FutureTask.run (FutureTask.java:266) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (Threadangor.javaExecutor $ Worker.run (Threadangor.javaExecutor) в $ Worker.run. .Thread.run (Thread.java:745) Вызвано: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1 Вызвано: org.apache.kafka.common.errors.SerializationException: Неизвестно волшебный байт! [2016-09-28 15: 47: 00,951] ОШИБКА Задача завершается и не будет восстановлена, пока не будет перезапущена вручную (org.apache.kafka.connect.runtime.WorkerTask: 143) [2016-09-28 15: 47: 00,951 ] ИНФОРМАЦИЯ Остановка раковины Cassandra. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask: 79) [2016-09-28 15: 47: 00,952] ИНФОРМАЦИЯ Завершение сеанса драйвера Cassandra и кластера. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter: 165)

Схема, которую я использую

 ./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'

Конфиг для потока: Flume-kafka.conf.properties

agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink


agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576

agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory

agent.channels.memoryChannel.capacity = 1000

 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
 agent.sinks.kafkaSink.topic = orders-topic
 agent.sinks.kafkaSink.brokerList = localhost:9092
 agent.sinks.kafkaSink.channel = memoryChannel
 agent.sinks.kafkaSink.batchSize = 20

Кто-нибудь может мне помочь, как исправить эту ошибку?


person Chirag    schedule 28.09.2016    source источник


Ответы (1)


Обычно, если у вас есть неизвестный магический байт, это означает, что ваши клиентская и серверная версии для Kafka несовместимы. Убедитесь, что ваша версия приемника Cassandra была построена с использованием клиентских библиотек Kafka версии, меньшей или равной версии ваших брокеров.

person dawsaw    schedule 08.10.2016
comment
Я проверил, все банки для 0,10, и я все еще получаю ту же ошибку - person Chirag; 11.10.2016
comment
Знает ли flume, как получить схему из реестра схем? Извините, я не слишком знаком с этой настройкой, но, похоже, проблема с десериализацией может быть связана с тем, что сериализатор не знает, как искать схему с помощью магического байта? - person dawsaw; 13.10.2016
comment
Ни один поток не знает, но я ожидаю, что смогу предоставить схему при сериализации - person Chirag; 13.10.2016
comment
Я пытаюсь сделать это в файле conf в flume, и это agent.sinks.kafkaSink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer agent.sinks.kafkaSink.interceptors = i1 agent.sinks.kafkaSink.interceptors .i1.type = static agent.sinks.kafkaSink.interceptors.i1.key = flume.avro.schema.literal agent.sinks.kafkaSink.interceptors.i1.value = [{\ name \: \ id \, \ type \ : \ int \}, {\ name \: \ created \, \ type \: \ string \}, {\ name \: product \, \ type \: \ string \}, {\ name \: \ price \, \ тип \: \ двойной \}] - person Chirag; 13.10.2016