Создать задержку между двумя чтениями сообщений очереди?

Я использую очереди Azure для выполнения массового импорта. Я использую WebJobs для выполнения процесса в фоновом режиме. Очередь удаляется из очереди очень часто. Как создать задержку между чтением 2 сообщений?

Вот как я добавляю сообщение в очередь

public async Task<bool> Handle(CreateFileUploadCommand message)
{
    var queueClient = _queueService.GetQueueClient(Constants.Queues.ImportQueue);

    var brokeredMessage = new BrokeredMessage(JsonConvert.SerializeObject(new ProcessFileUploadMessage
    {
        TenantId = message.TenantId,
        FileExtension = message.FileExtension,
        FileName = message.Name,
        DeviceId = message.DeviceId,
        SessionId = message.SessionId,
        UserId = message.UserId,
        OutletId = message.OutletId,
        CorrelationId = message.CorrelationId,

    }))
    {
        ContentType = "application/json",
    };

    await queueClient.SendAsync(brokeredMessage);

    return true;
}

А ниже функция WebJobs.

public class Functions
{
    private readonly IValueProvider _valueProvider;
    public Functions(IValueProvider valueProvider)
    {
        _valueProvider = valueProvider;
    }

    public async Task ProcessQueueMessage([ServiceBusTrigger(Constants.Constants.Queues.ImportQueue)] BrokeredMessage message,
    TextWriter logger)
    {

        var queueMessage = message.GetBody<string>();

        using (var client = new HttpClient())
        {
            client.BaseAddress = new Uri(_valueProvider.Get("ServiceBaseUri"));

            var stringContent = new StringContent(queueMessage, Encoding.UTF8, "application/json");

            var result = await client.PostAsync(RestfulUrls.ImportMenu.ProcessUrl, stringContent);

            if (result.IsSuccessStatusCode)
            {
                await message.CompleteAsync();
            }
            else
            {
                await message.AbandonAsync();
            }
        }
    }
}

comment
Любое обновление? Если вы считаете, что мой ответ полезен / полезен. Пожалуйста, отметьте его как ответ, чтобы другие люди могли извлечь из него пользу.   -  person Brando Zhang    schedule 15.06.2017


Ответы (1)


Насколько я знаю, azure webjobs sdk позволяет выполнять параллельную обработку в одном экземпляре (по умолчанию 16).

Если вы запустите свои веб-задания, он прочитает 16 сообщений очереди (загляните и вызовет Complete в сообщении, если функция завершится успешно, или вызовет Abandon) и создаст 16 процессов для одновременного выполнения функции триггера. Таким образом, вы чувствуете, что очередь очень часто удаляется из очереди.

Если вы хотите отключить одновременную обработку в одном экземпляре.

Я предлагаю вам установить для ServiceBusConfiguration MessageOptions.MaxConcurrentCalls значение 1.

Более подробную информацию вы можете найти в приведенных ниже кодах:

В программе.cs:

JobHostConfiguration config = new JobHostConfiguration();
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration();
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);

JobHost host = new JobHost(config);
host.RunAndBlock();

Если вы хотите создать задержку между чтением двух сообщений, я предлагаю вам создать собственный ServiceBusConfiguration.MessagingProvider.

Он содержит метод CompleteProcessingMessageAsync, этот метод завершает обработку указанного сообщения после вызова функции задания.

Я предлагаю вам добавить метод thread.sleep в CompleteProcessingMessageAsync, чтобы добиться задержки чтения.

Более подробно вы можете обратиться к приведенному ниже примеру кода:

CustomMessagingProvider.cs:

Примечание. Я переопределяю коды методов CompleteProcessingMessageAsync.

 public class CustomMessagingProvider : MessagingProvider
    {
        private readonly ServiceBusConfiguration _config;

        public CustomMessagingProvider(ServiceBusConfiguration config)
            : base(config)
        {
            _config = config;
        }

        public override NamespaceManager CreateNamespaceManager(string connectionStringName = null)
        {
            // you could return your own NamespaceManager here, which would be used
            // globally
            return base.CreateNamespaceManager(connectionStringName);
        }

        public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null)
        {
            // you could return a customized (or new) MessagingFactory here per entity
            return base.CreateMessagingFactory(entityPath, connectionStringName);
        }

        public override MessageProcessor CreateMessageProcessor(string entityPath)
        {
            // demonstrates how to plug in a custom MessageProcessor
            // you could use the global MessageOptions, or use different
            // options per entity
            return new CustomMessageProcessor(_config.MessageOptions);
        }

        private class CustomMessageProcessor : MessageProcessor
        {
            public CustomMessageProcessor(OnMessageOptions messageOptions)
                : base(messageOptions)
            {
            }

            public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
            {
                // intercept messages before the job function is invoked
                return base.BeginProcessingMessageAsync(message, cancellationToken);
            }

            public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
            {
                if (result.Succeeded)
                {
                    if (!MessageOptions.AutoComplete)
                    {
                        // AutoComplete is true by default, but if set to false
                        // we need to complete the message
                        cancellationToken.ThrowIfCancellationRequested();


                        await message.CompleteAsync();

                        Console.WriteLine("Begin sleep");
                        //Sleep 5 seconds
                        Thread.Sleep(5000);
                        Console.WriteLine("Sleep 5 seconds");

                    }
                }
                else
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    await message.AbandonAsync();
                }
            }
        }
    }

Основной метод Program.cs:

 static void Main()
        {
            var config = new JobHostConfiguration();

            if (config.IsDevelopment)
            {
                config.UseDevelopmentSettings();
            }

            var sbConfig = new ServiceBusConfiguration
            {
                MessageOptions = new OnMessageOptions
                {
                    AutoComplete = false,
                    MaxConcurrentCalls = 1
                }
            };
            sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
            config.UseServiceBus(sbConfig);
            var host = new JobHost(config);

            // The following code ensures that the WebJob will be running continuously
            host.RunAndBlock();
        }

Результат: введите здесь описание изображения

person Brando Zhang    schedule 05.06.2017
comment
Поскольку это метод async, вы должны заменить Thread.Sleep() на await Task.Delay(). Таким образом, основной поток не полностью заблокирован и может тем временем работать над другими задачами. - person Oliver; 30.06.2017