Это мой простой фрагмент кода, который пытается прочитать стандартную форму записи Avro:
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
using (var consumer = new
ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
.SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume();
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
Console.WriteLine(consumeResult.Value.Schema);
Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// commit final offsets and leave the group.
consumer.Close();
}
}
Как видите, я могу зарегистрировать схему, но не знаю, как получить ее значение данных.
Console.WriteLine(consumeResult.Value.Schema);
Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
В то время как схема и данные в этом объекте consumeResult.Value
такие:
{Схема: {"тип": "запись", "имя": "Пользователь", "пространство имен": "Confluent.Kafka.Examples.AvroSpecific", "поля": [{"имя": "имя", "тип ":" строка "}, {" имя ":" избранное_число "," тип ": [" int "," null "]}, {" имя ":" любимый_цвет "," тип ": [" строка "," null "]}]}, содержимое: {name: sfs, любимое_число: 41, избранное_цвет: синий,}}
Я хочу прочитать данные контента.
consumeResult.Value
строка? Или это динамический объект?consumeResult.Value.contents["favorite_number"]
не работает? В противном случае похоже, что сначала требуется десериализация JSON. - person rene   schedule 23.04.2019