IEventProcessor не читает из концентратора событий

В настоящее время я работаю над реализацией средства чтения концентратора событий с использованием EventProcessorHost и простой реализации IEventProcessor. Я подтвердил, что данные телеметрии записываются в концентратор событий с использованием превосходного заголовка Паоло Сальватори Обозреватель служебной шины. Я успешно настроил EventProcessorHost для использования учетной записи хранения для аренды и контрольных точек. Я вижу файлы данных концентратора событий в учетной записи хранения. Проблема, которую я вижу на данный момент, заключается в том, что реализация IEventProcessor, похоже, ничего не читает из концентратора событий.

Я не получаю никаких исключений. Приложение тестовой консоли без проблем подключается к учетной записи хранения. Я заметил, что оператор ведения журнала, который я добавил в конструктор, никогда не вызывается, поэтому похоже, что приемник никогда не создается. Я чувствую, что мне не хватает чего-то простого. Может ли кто-нибудь помочь мне определить, что я пропустил? Благодарю вас!

IEventProcessor Реализация:

namespace Receiver
{
    internal class SimpleEventProcessor : IEventProcessor
    {
        private Stopwatch _checkPointStopwatch;

        public SimpleEventProcessor()
        {
            Console.WriteLine("SimpleEventProcessor created");
        }

        #region Implementation of IEventProcessor

        public Task OpenAsync(PartitionContext context)
        {
            Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}",
                context.Lease.PartitionId, context.Lease.Offset);
            _checkPointStopwatch = new Stopwatch();
            _checkPointStopwatch.Start();
            return Task.FromResult<object>(null);
        }

        public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var data in messages.Select(eventData => Encoding.UTF8.GetString(eventData.GetBytes())))
            {
                Console.WriteLine("Message received.  Partition: '{0}', Data: '{1}'", context.Lease.PartitionId,
                    data);
            }

            if (_checkPointStopwatch.Elapsed > TimeSpan.FromSeconds(30))
            {
                await context.CheckpointAsync();
                _checkPointStopwatch.Restart();
            }
        }

        public async Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine("Processor shutting down.  Partition '{0}', Reason: {1}", context.Lease.PartitionId,
                reason);

            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }

        #endregion
    }
}

Код тестовой консоли:

namespace EventHubTestConsole
{
internal class Program
{
    private static void Main(string[] args)
    {
        AsyncPump.Run((Func<Task>) MainAsync);
    }

    private static async Task MainAsync()
    {
        const string eventHubConnectionString =
            "Endpoint=<EH endpoint>;SharedAccessKeyName=<key name>;SharedAccessKey=<key>";
        const string eventHubName = "<event hub name>";
        const string storageAccountName = "<storage account name>";
        const string storageAccountKey = "<valid storage key>";
        var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
            storageAccountName, storageAccountKey);
        Console.WriteLine("Connecting to storage account with ConnectionString: {0}", storageConnectionString);

        var eventProcessorHostName = Guid.NewGuid().ToString();
        var eventProcessorHost = new EventProcessorHost(
            eventProcessorHostName,
            eventHubName,
            EventHubConsumerGroup.DefaultGroupName,
            eventHubConnectionString,
            storageConnectionString);

        var epo = new EventProcessorOptions
        {
            MaxBatchSize = 100,
            PrefetchCount = 1,
            ReceiveTimeOut = TimeSpan.FromSeconds(20),
            InitialOffsetProvider = (name) => DateTime.Now.AddDays(-7)
        };

        epo.ExceptionReceived += OnExceptionReceived;

        await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(epo);

        Console.WriteLine("Receiving.  Please enter to stop worker.");
        Console.ReadLine();
    }

    public static void OnExceptionReceived(object sender, ExceptionReceivedEventArgs args)
    {
        Console.WriteLine("Event Hub exception received: {0}", args.Exception.Message);
    }
}

person MichaelMilom    schedule 29.06.2015    source источник
comment
Хм, интересно, сейчас это проблема с Azure. Ни одно из моих приложений больше не может взаимодействовать с концентратором событий Azure.   -  person Phuc H Duong    schedule 01.07.2015
comment
Я все еще могу подключиться через Service Bus Explorer. Я просмотрел соответствующий исходный код, и оказалось, что подключиться к нему намного сложнее, чем предлагают простые руководства. Поскольку я просто собираюсь использовать прямых потребителей и вернуться к слою EventProcessHost. Спасибо за комментарий.   -  person MichaelMilom    schedule 01.07.2015


Ответы (1)


Похоже, проблема связана с вашим значением для EventProcessorOptions.PrefetchCount.

Я немного изменил ваш код, как показано здесь (удалив AsyncPump и аккуратно отключив приемники). Я обнаружил, что RegisterEventProcessorAsync генерирует исключение, если PrefetchCount меньше 10.

namespace EventHubTestConsole
{
  internal class Program
  {
    private static void Main(string[] args)
    {
       const string eventHubConnectionString =
        "Endpoint=<EH endpoint>;SharedAccessKeyName=<key name>;SharedAccessKey=<key>";
       const string eventHubName = "<event hub name>";
       const string storageAccountName = "<storage account name>";
       const string storageAccountKey = "<valid storage key>";
       var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
        storageAccountName, storageAccountKey);
       Console.WriteLine("Connecting to storage account with ConnectionString: {0}", storageConnectionString);

       var eventProcessorHostName = Guid.NewGuid().ToString();
       var eventProcessorHost = new EventProcessorHost(
         eventProcessorHostName,
         eventHubName,
         EventHubConsumerGroup.DefaultGroupName,
         eventHubConnectionString,
         storageConnectionString);

       var epo = new EventProcessorOptions
         {
           MaxBatchSize = 100,
           PrefetchCount = 10,
           ReceiveTimeOut = TimeSpan.FromSeconds(20),
           InitialOffsetProvider = (name) => DateTime.Now.AddDays(-7)
         };

       epo.ExceptionReceived += OnExceptionReceived;

       eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(epo).Wait();

       Console.WriteLine("Receiving.  Please enter to stop worker.");
       Console.ReadLine();
       eventProcessorHost.UnregisterEventProcessorAsync().Wait();
    }


    public static void OnExceptionReceived(object sender,     ExceptionReceivedEventArgs args)
    {
      Console.WriteLine("Event Hub exception received: {0}", args.Exception.Message);
    }
  }
}
person Dominic Betts    schedule 06.07.2015