Ограничение пропускной способности Masstransit RPC (RabbitMq)

Мы используем Masstransit с RabbitMq для создания RPC от одного компонента нашей системы к другим.

Недавно мы столкнулись с ограничением пропускной способности на стороне клиента, измерив около 80 завершенных ответов в секунду.

Пытаясь выяснить, в чем проблема, я обнаружил, что запросы обрабатывались сервером RPC быстро, затем ответы помещались в очередь обратного вызова, а затем скорость обработки очереди составляла 80 М\с.

Это ограничение только на стороне клиента. Запуск другого процесса того же клиентского приложения на той же машине удваивает пропускную способность запросов на стороне сервера, но затем я вижу две очереди обратного вызова, заполненные сообщениями, каждая из которых потребляет те же 80 М\с.

Мы используем один экземпляр IBus

builder.Register(c =>
{
    var busSettings = c.Resolve<RabbitSettings>();
    var busControl = MassTransitBus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(busSettings.Host), h =>
            {
                h.Username(busSettings.Username);
                h.Password(busSettings.Password);
            });

            cfg.UseSerilog();

            cfg.Send<IProcessorContext>(x =>
            {
               x.UseCorrelationId(context => context.Scope.CommandContext.CommandId);
            });

    }
);

return busControl;
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();

Логика отправки выглядит так:

var busResponse = await _bus.Request<TRequest, TResult>(
                destinationAddress: _settings.Host.GetServiceUrl<TCommand>(queueType),
                message: commandContext,
                cancellationToken: default(CancellationToken),
                timeout: TimeSpan.FromSeconds(_settings.Timeout),
                callback: p => { p.WithPriority(priority); });

Кто-нибудь сталкивался с проблемой такого рода? Я предполагаю, что в логике отправки ответов есть какой-то программный предел. Это может быть максимальный размер пула потоков или размер буфера, а также счетчик предварительной выборки очереди ответов. Я пытался поиграться с размером пула потоков .Net, но ничего не помогло.

Я новичок в Masstransit и буду признателен за любую помощь в решении моей проблемы. Надеюсь, это можно исправить в конфигурации


person Юрий Ульянец    schedule 25.10.2018    source источник


Ответы (1)


Есть несколько вещей, которые вы можете попробовать, чтобы оптимизировать производительность. Я бы также посоветовал проверить MassTransit-Benchmark и запустить его в вашей среде — это даст вам представление о возможной пропускной способности вашего брокера. Это позволяет вам настраивать такие параметры, как количество упреждающих выборок, параллелизм и т. д., чтобы увидеть, как они влияют на ваши результаты.

Кроме того, я бы предложил использовать один из клиентов запросов, чтобы уменьшить настройку для каждого запроса/ответа. Например, создайте клиент запроса один раз, а затем используйте этот же клиент для каждого запроса.

var serviceUrl = yourMethodToGetIt<TRequest>(...); var client = Bus.CreateRequestClient<TRequest>(serviceUrl);

Затем используйте этот экземпляр IRequestClient<TRequest> всякий раз, когда вам нужно выполнить запрос.

Response<Value> response = await client.GetResponse<TResponse>(new Request());

Поскольку вы просто используете RPC, я настоятельно рекомендую настроить очередь конечной точки получения на неустойчивую, чтобы избежать записи запросов RPC на диск. И настройте счетчик предварительной выборки шины на более высокое значение (выше, чем максимальное количество одновременных запросов, которые вы можете иметь в 2 раза), чтобы гарантировать, что ответы всегда доставляются непосредственно вашему ожидающему потребителю ответа (это внутренняя вещь того, как RabbitMQ доставляет сообщения).

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.PrefetchCount = 1000; }

person Chris Patterson    schedule 25.10.2018
comment
Ссылка на тест: github.com/MassTransit/MassTransit-Benchmark - person Chris Patterson; 25.10.2018