сообщение не начинается с магического байта

Я пытаюсь создать данные в кодировке avro в теме kafka, используя пакет / linkedin / goavro в Go. Цель состоит в том, чтобы иметь возможность использовать тему с использованием разных клиентов.

Сначала я регистрирую схему следующим образом:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\":\"test_topic2\",\"type\":\"record\", \"fields\":[{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"password\",\"size\":10,\"type\":\"string\"}]}"}' http://localhost:8081/subjects/test_topic2-value/versions

Затем я создаю данные avro, создаю и использую их с помощью Go.

package main

import (

    "github.com/Shopify/sarama"
    "github.com/linkedin/goavro"
    "fmt"

)
const (
    brokers = "localhost:9092"
    topic     = "test_topic2"
)

const loginEventAvroSchema = `{"name":"test_topic2","type":"record", "fields":[{"name":"user","type":"string"},{"name":"password","size":10,"type":"string"}]}`

func main() {

// Create Message

codec, err := goavro.NewCodec(loginEventAvroSchema)
if err != nil {
    panic(err)
}

m := map[string]interface{}{
    "user": "pikachu", "password": 231231,
}

single, err := codec.SingleFromNative(nil, m)
if err != nil {
    panic(err)
}


// Producer
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Producer.Return.Successes = true

    config.Version = sarama.V2
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at test_topic2 [0] offset 1: message does not start with magic byte
0_0 //get broker cluster, err := sarama.NewSyncProducer(brokers, config) if err != nil { panic(err) } defer func() { if err := cluster.Close(); err != nil { panic(err) } }() msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(single), } cluster.SendMessage(msg) // Consumer clusterConsumer, err := sarama.NewConsumer(brokers, config) if err != nil { panic(err) } defer func() { if err := clusterConsumer.Close(); err != nil { panic(err) } }() msgK, _ := clusterConsumer.ConsumePartition(topic, 0, sarama.OffsetOldest) for { q := <-msgK.Messages() native, _, err := codec.NativeFromSingle([]byte(q.Value)) if err != nil { fmt.Println(err) } fmt.Println(native) }

Этот код отлично работает, и я могу успешно создавать и использовать сообщения в теме kafka.

Теперь я пытаюсь использовать темы из python avro-consumer:

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


c = AvroConsumer({
    'bootstrap.servers': 'localhost',
    'group.id': 'groupid',
    'schema.registry.url': 'http://localhost:8081',
    'auto.offset.reset': 'earliest'})


c.subscribe(['test_topic2'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value(), msg.key())

c.close()

Но я получаю следующую ошибку:

confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at test_topic2 [0] offset 1: message does not start with magic byte

Я думаю, что я что-то упустил в части продюсера Go, я был бы очень признателен, если бы кто-нибудь поделился своим опытом о том, как решить эту проблему.


person roAl    schedule 09.03.2020    source источник
comment
Ваш ключ типа avro или string?   -  person Giorgos Myrianthous    schedule 09.03.2020
comment
@Gio нет ключа   -  person OneCricketeer    schedule 09.03.2020


Ответы (1)


goavro не использует реестр схем.

Кроме того, вы используете StringEncoder, который, как я предполагаю, выводит только фрагмент строки, а не байты Avro.

StringEncoder реализует интерфейс кодировщика для строк Go, поэтому их можно использовать в качестве ключа или значения в ProducerMessage.

FWIW, я бы посоветовал протестировать потребителя с помощью kafka-avro-console-consumer, если он у вас есть

person OneCricketeer    schedule 09.03.2020