В моем сценарии у меня будут пакеты событий, поступающих одновременно, а затем длительные периоды времени, когда EventHub будет простаивать. В моем процессорном клиенте я хочу проверять каждые N событий или N минут (в зависимости от того, что наступит раньше).
Вот как я настроил свой Azure.Messaging.EventHubs.EventProcessorClient:
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();
// Start the processing
await processor.StartProcessingAsync();
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(10));
Console.WriteLine($"{eventsProcessed} events have been processed");
}
В моем ProcessEventHandler я проверяю событияProcessedSinceLastCheckpoint, а также время, прошедшее на секундомере. Когда любой из них достигает своего максимума, я сбрасываю оба и отмечаю это в окне консоли:
static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
++eventsProcessed;
++eventsProcessedSinceLastCheckpoint;
Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
// After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
{
eventsProcessedSinceLastCheckpoint = 0;
_checkpointStopWatch.Restart();
await eventArgs.UpdateCheckpointAsync();
Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
}
return Task.CompletedTask;
}
Проверка переменной eventsProcessedSinceLastCheckpoint работает отлично, поскольку ProcessEventHandler запускается всякий раз, когда приходят новые события. Однако, когда EventHub бездействует, ProcessEventHandler не вызывается, поэтому в случаях, когда EventHub молчит в течение многих минут или часов, я никогда не буду проверять прошедшее время.
Я понимаю, что могу просто удалить таймер и что мой процессор должен иметь возможность обрабатывать повторяющиеся события, если между контрольными точками произойдет сбой. Но в моем сценарии (поскольку у меня будет такое долгое время простоя) я хочу воспользоваться имеющимся у меня временем и наверстать упущенное, чтобы избежать появления дополнительных дубликатов, когда я могу. Отсюда добавление таймера в качестве запасного варианта в периоды простоя.
У меня вопрос: как я могу вызвать UpdateCheckpointAsync() вне ProcessEventHandler? Этот метод существует только для ProcessEventArgs. Я не могу вызвать его непосредственно в EventProcessorClient, что было бы идеально, поскольку я могу переместить проверку таймера за пределы ProcessEventHandler в мой цикл while....