Проблемы кодирования/форматирования с библиотекой python kafka

Я уже некоторое время пытаюсь использовать библиотеку python kafka. и не может заставить продюсера работать.

После небольшого исследования я обнаружил, что kafka отправляет (и я предполагаю, что тоже ожидает) дополнительный 5-байтовый заголовок (один 0 байт, один длинный, содержащий идентификатор схемы для реестра схемы) потребителям. Мне удалось заставить потребителя работать, просто удалив первые байты.

Должен ли я добавлять аналогичный заголовок при написании продюсера?

Ниже исключения, которое выходит:

    [2016-09-14 13:32:48,684] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
    Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Я использую последние стабильные версии как kafka, так и python-kafka.

ИЗМЕНИТЬ

Потребитель

from kafka import KafkaConsumer
import avro.io
import avro.schema
import io
import requests
import struct

# To consume messages
consumer = KafkaConsumer('hadoop_00',
                         group_id='my_group',
                         bootstrap_servers=['hadoop-master:9092'])

schema_path = "resources/f1.avsc"
for msg in consumer:
    value = bytearray(msg.value)
    schema_id = struct.unpack(">L", value[1:5])[0]
    response = requests.get("http://hadoop-master:8081/schemas/ids/" + str(schema_id))
    schema = response.json()["schema"]
    schema = avro.schema.parse(schema)
    bytes_reader = io.BytesIO(value[5:])
    # bytes_reader = io.BytesIO(msg.value)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    temp = reader.read(decoder)
    print(temp)

Продюсер

from kafka import KafkaProducer
import avro.schema
import io
from avro.io import DatumWriter

producer = KafkaProducer(bootstrap_servers="hadoop-master")

# Kafka topic
topic = "hadoop_00"

# Path to user.avsc avro schema
schema_path = "resources/f1.avsc"
schema = avro.schema.parse(open(schema_path).read())
range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for i in range:
    producer.send(topic, b'{"f1":"value_' + str(i))

person Moises Jimenez    schedule 14.09.2016    source источник
comment
Ваш код для производителя и потребителя, пожалуйста. Это очень поможет сфокусировать все.   -  person thiruvenkadam    schedule 14.09.2016
comment
@thiruvenkadam ну вот   -  person Moises Jimenez    schedule 14.09.2016


Ответы (2)


Я могу заставить своего производителя python отправлять сообщения в Kafka-Connect с помощью Schema-Registry:

...
import avro.datafile
import avro.io
import avro.schema
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='kafka:9092')
with open('schema.avsc') as f:
    schema = avro.schema.Parse(f.read())

def post_message():
    bytes_writer = io.BytesIO()
    # Write the Confluent "Magic Byte"
    bytes_writer.write(bytes([0]))
    # Should get or create the schema version with Schema-Registry
    ...
    schema_version = 1
    bytes_writer.write(
        int.to_bytes(schema_version, 4, byteorder='big'))

    # and then the standard Avro bytes serialization
    writer = avro.io.DatumWriter(schema)
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer.write({'key': 'value'}, encoder)
    producer.send('topic', value=bytes_writer.getvalue())

Документация о «Волшебном байте»: https://github.com/confluentinc/schema-registry/blob/master/docs/serializer-formatter.rst

person Yann    schedule 16.09.2016
comment
Пришлось обновиться до python 3.5 и использовать библиотеку avro-python3, чтобы заставить его работать, спасибо! - person Moises Jimenez; 19.09.2016

Поскольку вы читаете с помощью BinaryDecoder и DatumReader, если вы отправляете данные в обратном порядке (используя DatumWriter с BinaryEncoder в качестве кодировщика), ваши сообщения, я полагаю, будут в порядке.

Что-то вроде этого:

Продюсер

from kafka import KafkaProducer
import avro.schema
import io
from avro.io import DatumWriter, BinaryEncoder
producer = KafkaProducer(bootstrap_servers="hadoop-master")

# Kafka topic
topic = "hadoop_00"

# Path to user.avsc avro schema
schema_path = "resources/f1.avsc"
schema = avro.schema.parse(open(schema_path).read())
# range is a bad variable name. I changed it here
value_range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
for i in value_range:
    datum_writer = DatumWriter(schema)
    byte_writer = io.BytesIO()
    datum_encoder = BinaryEncoder(byte_writer)
    datum_writer.write({"f1" : "value_%d" % (i)}, datum_encoder)
    producer.send(topic, byte_writer.getvalue())

Несколько изменений, которые я сделал:

  • используйте DatumWriter и BinaryEncoder
  • Вместо json я отправляю словарь в потоке байтов (возможно, вам придется проверить свой код с помощью обычного словаря, и он тоже может работать, но я не уверен)
  • Отправка сообщения в тему kafka с использованием потока байтов (у меня иногда это не удавалось, и в этих случаях я присваивал метод .getvalue переменной и использовал переменную в производитель. отправить. Я не знаю причину сбоя но присваивание переменной всегда работало)

Я не смог проверить код, который я добавил. Но это фрагмент кода, который я написал ранее, используя avro. Если это не работает для вас, пожалуйста, дайте мне знать в комментариях. Возможно, это из-за моей ржавой памяти. Я обновлю этот ответ рабочим, как только доберусь до дома, где смогу протестировать код.

person thiruvenkadam    schedule 14.09.2016
comment
Привет! Спасибо за вашу помощь. К сожалению, я проверил это и получил такое же исключение в отношении этого магического байта. - person Moises Jimenez; 14.09.2016
comment
Кроме того, теперь я написал небольшой производитель на java, используя API kafka java, и я получаю точно такую ​​​​же ошибку. - person Moises Jimenez; 14.09.2016