Push-сообщения в формате avro через golang в kafka

Я пытался отправить несколько сообщений в kafka через confluent go client, но загвоздка в том, что сообщения нужно отправлять в формате avro. То же самое может быть легко достигнуто в приложении java springboot.

У меня есть подозрение, что все это возможно через confluent go client. Хотя у меня есть альтернатива отправке этих сообщений через слитный прокси-сервер для отдыха, но это будет означать 3-4-кратное снижение производительности, от чего я бы отказался.

Я попробовал goAvro конвертировать сообщения в avro. Хотя я не получаю никаких ошибок при создании, но часть данных не сохраняется в формате avro. введите здесь описание изображения

avroCodec, err := goavro.NewCodec(schemaString)

if err != nil {
    log.Panic(err.Error())
}

appointmentByte,_ := json.Marshal(appointment)

native, _, _ := avroCodec.NativeFromTextual(appointmentByte)

binaryValue, _ := avroCodec.BinaryFromNative(nil,  native)

var recordValue []byte

schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(id))

recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, binaryValue...)

log.Print(recordValue)

key, _ := uuid.NewUUID()

fmt.Print(key.String())
p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{
        Topic: &topic, Partition: kafka.PartitionAny},
    Key: []byte(key.String()), Value: recordValue}, nil)

person Rahul Singh    schedule 30.01.2020    source источник
comment
Можете ли вы попробовать сделать минимальный, отдельный кусок кода и показать нам.   -  person nilsocket    schedule 30.01.2020
comment
@nilsocket приведенный выше код обрабатывает часть кодирования сообщения.   -  person Rahul Singh    schedule 30.01.2020
comment
{ тема: назначение-значение, ключ: 4efcfb26-42cd-11ea-a7f5-3af9d398b113, значение: +, раздел: 0, смещение: 4}   -  person Rahul Singh    schedule 30.01.2020
comment
Необработанная версия данных, хранящихся в kafka, описана выше.   -  person Rahul Singh    schedule 30.01.2020
comment
json.Marshal(appointment) создает json, а не Avro... Откуда вы берете ID?   -  person OneCricketeer    schedule 30.01.2020
comment
@cricket_007 cricket_007 Я получил идентификатор из реестра схем, единственная проблема, с которой я столкнулся, - это способ преобразовать мою структуру go в двоичный файл avro, что и ожидалось.   -  person Rahul Singh    schedule 30.01.2020
comment
Видел это? github.com/lensesio/schema-registry   -  person OneCricketeer    schedule 30.01.2020
comment
@cricket_007 я уже настроил реестр схемы. Единственная проблема, с которой я столкнулся, была с двоичным кодированием avro. github.com/hamba/avro помог мне это сделать.   -  person Rahul Singh    schedule 31.01.2020
comment
@cricket_007 код работает как шарм при замене goavro и json.marshal   -  person Rahul Singh    schedule 31.01.2020
comment
Тогда вы не используете Avro в этот момент?   -  person OneCricketeer    schedule 01.02.2020
comment
@cricket_007 cricket_007 Я использую avro. Проблема решена, теперь я могу передавать закодированные данные avro в kafka. Спасибо и ура.   -  person Rahul Singh    schedule 01.02.2020
comment
Не стесняйтесь опубликовать свое решение в качестве ответа   -  person OneCricketeer    schedule 02.02.2020


Ответы (1)


Вы можете поискать на Github решения вашей проблемы. В настоящее время он не является частью проекта, но над ним ведется работа.

https://github.com/confluentinc/confluent-kafka-go/issues/ 69

person OneCricketeer    schedule 30.01.2020