Ребус: 2 обработчика в 2 процессах. Бейте непоследовательно и поочередно

У меня есть два консольных приложения, использующих Rebus. Оба они ссылаются на сборку, в которой определены сообщения (команды и события). Консольное приложение «A» отправляет команды и прослушивает события для ведения журнала (например: отправляет CreateTCommand и ожидает TCreatedEvent). Консольное приложение «B» (которое на самом деле является приложением ASP.NET Core) слушает команды и обрабатывает их (например: сага инициируется CreateTCommand, создается агрегат, который вызывает событие TCreatedEvent). В другой DLL внутри процесса приложения «B» есть обработчик для TCreatedEvent.

Итак, у меня есть команда создания, отправленная приложением «A», и два обработчика для созданного события, один в приложении «A» и один в приложении «B».

Проблема: когда я отправляю команду из приложения «A» в первый раз, приложение «B» вызывает созданное событие, которое запускает обработчик в том же процессе. Обработчик в приложении «A» не запускается. Дальнейшие команды из приложения «A» всегда обрабатываются сагой в приложении «B», но созданные события никогда больше не попадают в обработчик в этом процессе, а обрабатываются приложением «A» !!! Иногда (я не могу понять, как воспроизвести) команды из приложения «A» не обрабатываются сагой в приложении «B» (я нахожу команду в очереди ошибок в MSMQ с исключением «сообщение с идентификатором не может быть отправлено любым обработчикам »). Иногда (очень редко) попадали оба обработчика. Но я не могу последовательно воспроизвести поведение ...

Мои ощущения по этому поводу (почти ничего не зная о Ребусе, что для меня в новинку):

  • может быть проблема параллелизма? Я имею в виду: Rebus настроен для хранения подписок извне для процессов (с использованием SQL или Mongo проблема не исчезнет), поэтому я подумал, что, возможно, первый обработчик слишком быстр и отмечает событие как обработанное до того, как второй обработчик призванный
  • проверяя таблицу SQL для подписок, я нахожу 5 строк (по одной для каждого типа событий, на которые я подписался в коде (используя bus.Subscribe () при запуске приложения) с тем же адресом (имя очереди, привязанное к моей локальной машине) Имя) .Проблема иметь только один адрес и два процесса пытаются его использовать?

Код конфигурации для Rebus одинаков в двух приложениях и выглядит следующим образом:

        const string inputQueueAddress = "myappqueue";
        var mongoClient = new MongoClient("mongodb://localhost:27017");
        var mongoDatabase = mongoClient.GetDatabase("MyAppRebusPersistence");
        var config = Rebus.Config.Configure.With(new NetCoreServiceCollectionContainerAdapter(services))
            .Logging(l => l.Trace())
            .Routing(r => r.TypeBased()
                .MapAssemblyOf<AlertsCommandStackAssemblyMarker>(inputQueueAddress)
                .MapAssemblyOf<AlertsQueryStackAssemblyMarker>(inputQueueAddress)
            )
            .Subscriptions(s => s.StoreInMongoDb(mongoDatabase, "subscriptions"))
            .Sagas(s => s.StoreInMongoDb(mongoDatabase))
            .Timeouts(t => t.StoreInMongoDb(mongoDatabase, "timeouts"))
            .Transport(t => t.UseMsmq(inputQueueAddress));

        var bus = config.Start();
        bus.Subscribe<AlertDefinitionCreatedEvent>();
        bus.Subscribe<AlertStateAddedEvent>();
        bus.Subscribe<AlertConfigurationForEhrDefinitionAddedEvent>();
        services.AddSingleton(bus);

        services.AutoRegisterHandlersFromThisAssembly();

Надеюсь, кто-то может помочь, это сводит меня с ума ...

p.s .: проблема присутствует также при передаче isCentralized: true в subscription.StoreInMongoDb ().

РЕДАКТИРОВАТЬ 1. Я добавил ведение журнала консоли, и вы можете увидеть это странное поведение: https://postimg.org/image/czz5lchp9/

Первая команда отправлена ​​успешно. Это обрабатывается сагой, и событие запускает обработчик в консольном приложении «A». Ребус говорит, что вторая команда не была отправлена ​​никаким обработчикам, но на самом деле она была обработана сагой (я следил за кодом при отладке), и событие было обработано обработчиком в приложении «B», а не «A» ... почему? ; (

РЕДАКТИРОВАТЬ 2: Я отлаживаю исходный код Ребуса. Я заметил, что в ThreadPoolWorker.cs метод TryAsyncReceive

    async void TryAsyncReceive(CancellationToken token, IDisposable parallelOperation)
    {
        try
        {
            using (parallelOperation)
            using (var context = new DefaultTransactionContext())
            {
                var transportMessage = await ReceiveTransportMessage(token, context);

                if (transportMessage == null)
                {
                    context.Dispose();

                    // no need for another thread to rush in and discover that there is no message
                    //parallelOperation.Dispose();

                    _backoffStrategy.WaitNoMessage();
                    return;
                }

                _backoffStrategy.Reset();

                await ProcessMessage(context, transportMessage);
            }
        }

после публикации TCreatedEvent приложением «B» в приложении «A» код достигает await ProcessMessage (context, transportMessage), где transportMessage является фактическим событием. Эта строка кода не достигается в процессе приложения «B». Похоже, что первый получатель сообщения удаляет его из очереди MSMQ. Как я уже сказал, я новичок в Rebus и автобусах в целом, но если это поведение соответствует дизайну, я очень озадачен ... как несколько шин в нескольких процессах могут прослушивать одну и ту же очередь ???


person Etchelon    schedule 18.11.2016    source источник


Ответы (1)


У вас никогда не должно быть двух экземпляров шины, получающих сообщения из одной очереди, если они не являются несколькими экземплярами одной и той же конечной точки.

Когда два процесса используют одну и ту же очередь ввода, они будут принимать сообщения друг от друга. Это может быть совершенно нормально, если вы реализуете шаблон конкурирующих потребителей, используя его. для равномерного распределения работы между кластером рабочих процессов, но вы не можете совместно использовать входную очередь между несколькими разными конечными точками.

Я предполагаю, что все будет казаться намного более предсказуемым, если вы позволите каждому экземпляру шины использовать свою собственную очередь ввода;)

Теперь вы говорите мне, что эти обработчики должны жить внутри основного приложения.

Нет, нет :) Я говорю вам, что вы получите непредсказуемые результаты, если позволите двум разным приложениям перехватывать сообщения друг друга.

Хотя вполне нормально, что все обработчики находятся в одном экземпляре шины (и, следовательно, могут вызываться сообщениями из одной очереди), наиболее распространенный сценарий заключается в том, что вы разделите свое приложение таким образом, чтобы оно соответствовало тому, как вы хотите развивать систему. .

Таким образом, вы можете обновлять одно приложение за раз, избегая больших обновлений типа «останови мир».

Вы делаете это, запуская несколько конечных точек, каждая из которых использует свою собственную очередь, а затем отправляете сообщения МАРШРУТ между конечными точками для связи.

Рассмотрим сценарий, в котором вы хотите отправить команду в командный процессор. Командный процессор - это конечная точка Rebus, которая получает сообщения из очереди command_processor.

В конце отправителя вы настроите «сопоставление конечных точек» (вы можете узнать больше об этом в раздел маршрутизации в вики Rebus, который может выглядеть так:

Configure.With(...)
    .Transport(t => t.UseMsmq("sender"))
    .Routing(r => {
        r.TypeBased()
            .Map<TheCommand>("command_processor");
    })
    .Start();

что позволит отправителю просто перейти

await bus.Send(new TheCommand(...));

и тогда шина будет знать, в какую очередь доставить командное сообщение.

Надеюсь, это проясняет ситуацию :)

Обратите внимание, что это очень простой случай двухточечного обмена сообщениями где одна конечная точка отправляет сообщение, которое предназначено для использования одной другой конечной точкой. Существует несколько других паттернов, с которыми Ребус может вам помочь, например: запрос / ответ и опубликовать / подписаться.

person mookid8000    schedule 18.11.2016
comment
Вау хорошо ... как я и подозревал после отладки. Как я уже сказал, я новичок в этом мире, и что касается того, как мне рассказали об использовании шины, я предвидел возможность разделить многие внутренние операции, которые наше приложение в настоящее время выполняет, на множество отдельных приложений (служб Windows), все указывающие на такая же очередь за счет использования Ребуса. Теперь вы говорите мне, что эти обработчики должны жить внутри основного приложения ... не трагедия, но добавление нового обработчика означает перекомпиляцию и перезапуск пула приложений, а не просто включение другой службы. Или это означает что-то еще, о чем я не думаю? - person Etchelon; 19.11.2016
comment
Хорошо понял. Я собираюсь попробовать что-то вроде этого: в основном веб-приложении есть синглтон rebus, настроенный для приема сообщений, подобных ExternalServiceStarted и ExternalServiceStopped. ExternalServiceStarted информирует основное веб-приложение о необходимости добавления сопоставления для определенных типов сообщений в новую очередь, чтобы новая служба могла прослушивать определенные события, которые, следовательно, будут отправляться в (1 + n) очереди, где n - количество внешних служб. . Остановленное событие удаляет сопоставление, чтобы избежать засорения очереди, которая не обрабатывается. Что вы думаете? - person Etchelon; 21.11.2016