Насколько я знаю, azure webjobs sdk позволяет выполнять параллельную обработку в одном экземпляре (по умолчанию 16).
Если вы запустите свои веб-задания, он прочитает 16 сообщений очереди (загляните и вызовет Complete в сообщении, если функция завершится успешно, или вызовет Abandon) и создаст 16 процессов для одновременного выполнения функции триггера. Таким образом, вы чувствуете, что очередь очень часто удаляется из очереди.
Если вы хотите отключить одновременную обработку в одном экземпляре.
Я предлагаю вам установить для ServiceBusConfiguration MessageOptions.MaxConcurrentCalls значение 1.
Более подробную информацию вы можете найти в приведенных ниже кодах:
В программе.cs:
JobHostConfiguration config = new JobHostConfiguration();
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration();
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);
JobHost host = new JobHost(config);
host.RunAndBlock();
Если вы хотите создать задержку между чтением двух сообщений, я предлагаю вам создать собственный ServiceBusConfiguration.MessagingProvider.
Он содержит метод CompleteProcessingMessageAsync, этот метод завершает обработку указанного сообщения после вызова функции задания.
Я предлагаю вам добавить метод thread.sleep в CompleteProcessingMessageAsync, чтобы добиться задержки чтения.
Более подробно вы можете обратиться к приведенному ниже примеру кода:
CustomMessagingProvider.cs:
Примечание. Я переопределяю коды методов CompleteProcessingMessageAsync.
public class CustomMessagingProvider : MessagingProvider
{
private readonly ServiceBusConfiguration _config;
public CustomMessagingProvider(ServiceBusConfiguration config)
: base(config)
{
_config = config;
}
public override NamespaceManager CreateNamespaceManager(string connectionStringName = null)
{
// you could return your own NamespaceManager here, which would be used
// globally
return base.CreateNamespaceManager(connectionStringName);
}
public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null)
{
// you could return a customized (or new) MessagingFactory here per entity
return base.CreateMessagingFactory(entityPath, connectionStringName);
}
public override MessageProcessor CreateMessageProcessor(string entityPath)
{
// demonstrates how to plug in a custom MessageProcessor
// you could use the global MessageOptions, or use different
// options per entity
return new CustomMessageProcessor(_config.MessageOptions);
}
private class CustomMessageProcessor : MessageProcessor
{
public CustomMessageProcessor(OnMessageOptions messageOptions)
: base(messageOptions)
{
}
public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
{
// intercept messages before the job function is invoked
return base.BeginProcessingMessageAsync(message, cancellationToken);
}
public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (result.Succeeded)
{
if (!MessageOptions.AutoComplete)
{
// AutoComplete is true by default, but if set to false
// we need to complete the message
cancellationToken.ThrowIfCancellationRequested();
await message.CompleteAsync();
Console.WriteLine("Begin sleep");
//Sleep 5 seconds
Thread.Sleep(5000);
Console.WriteLine("Sleep 5 seconds");
}
}
else
{
cancellationToken.ThrowIfCancellationRequested();
await message.AbandonAsync();
}
}
}
}
Основной метод Program.cs:
static void Main()
{
var config = new JobHostConfiguration();
if (config.IsDevelopment)
{
config.UseDevelopmentSettings();
}
var sbConfig = new ServiceBusConfiguration
{
MessageOptions = new OnMessageOptions
{
AutoComplete = false,
MaxConcurrentCalls = 1
}
};
sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
config.UseServiceBus(sbConfig);
var host = new JobHost(config);
// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}
Результат: ![введите здесь описание изображения](https://i.stack.imgur.com/yBONj.png)
person
Brando Zhang
schedule
05.06.2017