Обновление контрольной точки смещения раздела EventHub в Azure.Messaging.EventHubs.EventProcessorClient при простое

В моем сценарии у меня будут пакеты событий, поступающих одновременно, а затем длительные периоды времени, когда EventHub будет простаивать. В моем процессорном клиенте я хочу проверять каждые N событий или N минут (в зависимости от того, что наступит раньше).

Вот как я настроил свой Azure.Messaging.EventHubs.EventProcessorClient:

EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;

//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();

// Start the processing
await processor.StartProcessingAsync();

while (true)
{
    await Task.Delay(TimeSpan.FromSeconds(10));
    Console.WriteLine($"{eventsProcessed} events have been processed");
}

В моем ProcessEventHandler я проверяю событияProcessedSinceLastCheckpoint, а также время, прошедшее на секундомере. Когда любой из них достигает своего максимума, я сбрасываю оба и отмечаю это в окне консоли:

static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
   ++eventsProcessed;
   ++eventsProcessedSinceLastCheckpoint;

   Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));

    // After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
    if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
    {
        eventsProcessedSinceLastCheckpoint = 0;
        _checkpointStopWatch.Restart();

        await eventArgs.UpdateCheckpointAsync();
        Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
    }
    return Task.CompletedTask;

}

Проверка переменной eventsProcessedSinceLastCheckpoint работает отлично, поскольку ProcessEventHandler запускается всякий раз, когда приходят новые события. Однако, когда EventHub бездействует, ProcessEventHandler не вызывается, поэтому в случаях, когда EventHub молчит в течение многих минут или часов, я никогда не буду проверять прошедшее время.

Я понимаю, что могу просто удалить таймер и что мой процессор должен иметь возможность обрабатывать повторяющиеся события, если между контрольными точками произойдет сбой. Но в моем сценарии (поскольку у меня будет такое долгое время простоя) я хочу воспользоваться имеющимся у меня временем и наверстать упущенное, чтобы избежать появления дополнительных дубликатов, когда я могу. Отсюда добавление таймера в качестве запасного варианта в периоды простоя.

У меня вопрос: как я могу вызвать UpdateCheckpointAsync() вне ProcessEventHandler? Этот метод существует только для ProcessEventArgs. Я не могу вызвать его непосредственно в EventProcessorClient, что было бы идеально, поскольку я могу переместить проверку таймера за пределы ProcessEventHandler в мой цикл while....


person INNVTV    schedule 23.04.2020    source источник


Ответы (1)


Установка EventHubProcessorClientOptions.MaximumWaitTime при создании экземпляра процессора позволит вызывать обработчик, когда не считываются никакие события. Когда установлено ненулевое значение, время ожидания в основном означает «отправлять мне события, как только вы их получите, но пинговать мой обработчик, если в течение этого интервала не было прочитано ни одного события».

Что касается обновления контрольных точек в этом сценарии, рекомендуемым подходом будет кэширование аргументов для последнего события, которое было отправлено обработчику, и использование его для вызова UpdateCheckpointAsync. В этом примере демонстрируется подход, обеспечивающий создание контрольной точки при остановке обработки раздела.

person Jesse Squire    schedule 23.04.2020
comment
Спасибо, Джесси. Это направило меня по правильному пути. - person INNVTV; 24.04.2020
comment
Рад, что это помогло. Нас не устраивает неуклюжесть этого подхода, и мы заинтересованы в обратной связи, которая поможет нам разработать улучшения и расставить приоритеты. Я открыл следующее для обсуждения сообществом, если вы хотите поделиться своими мыслями: github.com/Azure/azure-sdk-for-net/issues/11566 - person Jesse Squire; 24.04.2020
comment
Спасибо. Согласитесь, немного неудобно. Я могу использовать триггерные функции EventHub, поскольку многие из них встроены, но мне нужна гибкость для запуска моей собственной обработки за пределами Azure. Например, локальный сервер, на котором выполняется фоновая задача. Влезу в дискуссию! - person INNVTV; 24.04.2020