Как читать данные, специфичные для GenericRecord, в конфлюентной кафке C #

Это мой простой фрагмент кода, который пытается прочитать стандартную форму записи Avro:

using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
  using (var consumer = new 
        ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
                        .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                        .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                        .Build())
                {
      consumer.Subscribe(topicName);

      try
       {
           while (true)
             {
                try
                 {
                    var consumeResult = consumer.Consume();
                    Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
                    Console.WriteLine(consumeResult.Value.Schema);
                            Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);

                  }
                 catch (ConsumeException e)
                   {
                       Console.WriteLine($"Consume error: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        // commit final offsets and leave the group.
                        consumer.Close();
                    }
                }

Как видите, я могу зарегистрировать схему, но не знаю, как получить ее значение данных.

 Console.WriteLine(consumeResult.Value.Schema);

 Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);

В то время как схема и данные в этом объекте consumeResult.Value такие:

{Схема: {"тип": "запись", "имя": "Пользователь", "пространство имен": "Confluent.Kafka.Examples.AvroSpecific", "поля": [{"имя": "имя", "тип ":" строка "}, {" имя ":" избранное_число "," тип ": [" int "," null "]}, {" имя ":" любимый_цвет "," тип ": [" строка "," null "]}]}, содержимое: {name: sfs, любимое_число: 41, избранное_цвет: синий,}}

Я хочу прочитать данные контента.


person Muhammad Faizan Khan    schedule 23.04.2019    source источник
comment
consumeResult.Value строка? Или это динамический объект? consumeResult.Value.contents["favorite_number"] не работает? В противном случае похоже, что сначала требуется десериализация JSON.   -  person rene    schedule 23.04.2019
comment
value - TValue GenericRecord, в то время как consumerResult.Value.contents [избранное_число], контент недоступен.   -  person Muhammad Faizan Khan    schedule 23.04.2019


Ответы (1)


content недоступен

Я не думаю, что Value.contents это то, что вам нужно. В частности, потому что это свойство private, как вы упомянули

Получатель определяется как словарь - Исходный код

Попробуйте consumeResult.Value["favorite_number"]


Когда вы выполняете consumeResult.Value.Schema["favorite_number"], вы получаете объект Field в схеме, а не значение поля во внешней записи. - Источник код

person OneCricketeer    schedule 23.04.2019