У меня есть два консольных приложения, использующих Rebus. Оба они ссылаются на сборку, в которой определены сообщения (команды и события). Консольное приложение «A» отправляет команды и прослушивает события для ведения журнала (например: отправляет CreateTCommand и ожидает TCreatedEvent). Консольное приложение «B» (которое на самом деле является приложением ASP.NET Core) слушает команды и обрабатывает их (например: сага инициируется CreateTCommand, создается агрегат, который вызывает событие TCreatedEvent). В другой DLL внутри процесса приложения «B» есть обработчик для TCreatedEvent.
Итак, у меня есть команда создания, отправленная приложением «A», и два обработчика для созданного события, один в приложении «A» и один в приложении «B».
Проблема: когда я отправляю команду из приложения «A» в первый раз, приложение «B» вызывает созданное событие, которое запускает обработчик в том же процессе. Обработчик в приложении «A» не запускается. Дальнейшие команды из приложения «A» всегда обрабатываются сагой в приложении «B», но созданные события никогда больше не попадают в обработчик в этом процессе, а обрабатываются приложением «A» !!! Иногда (я не могу понять, как воспроизвести) команды из приложения «A» не обрабатываются сагой в приложении «B» (я нахожу команду в очереди ошибок в MSMQ с исключением «сообщение с идентификатором не может быть отправлено любым обработчикам »). Иногда (очень редко) попадали оба обработчика. Но я не могу последовательно воспроизвести поведение ...
Мои ощущения по этому поводу (почти ничего не зная о Ребусе, что для меня в новинку):
- может быть проблема параллелизма? Я имею в виду: Rebus настроен для хранения подписок извне для процессов (с использованием SQL или Mongo проблема не исчезнет), поэтому я подумал, что, возможно, первый обработчик слишком быстр и отмечает событие как обработанное до того, как второй обработчик призванный
- проверяя таблицу SQL для подписок, я нахожу 5 строк (по одной для каждого типа событий, на которые я подписался в коде (используя bus.Subscribe () при запуске приложения) с тем же адресом (имя очереди, привязанное к моей локальной машине) Имя) .Проблема иметь только один адрес и два процесса пытаются его использовать?
Код конфигурации для Rebus одинаков в двух приложениях и выглядит следующим образом:
const string inputQueueAddress = "myappqueue";
var mongoClient = new MongoClient("mongodb://localhost:27017");
var mongoDatabase = mongoClient.GetDatabase("MyAppRebusPersistence");
var config = Rebus.Config.Configure.With(new NetCoreServiceCollectionContainerAdapter(services))
.Logging(l => l.Trace())
.Routing(r => r.TypeBased()
.MapAssemblyOf<AlertsCommandStackAssemblyMarker>(inputQueueAddress)
.MapAssemblyOf<AlertsQueryStackAssemblyMarker>(inputQueueAddress)
)
.Subscriptions(s => s.StoreInMongoDb(mongoDatabase, "subscriptions"))
.Sagas(s => s.StoreInMongoDb(mongoDatabase))
.Timeouts(t => t.StoreInMongoDb(mongoDatabase, "timeouts"))
.Transport(t => t.UseMsmq(inputQueueAddress));
var bus = config.Start();
bus.Subscribe<AlertDefinitionCreatedEvent>();
bus.Subscribe<AlertStateAddedEvent>();
bus.Subscribe<AlertConfigurationForEhrDefinitionAddedEvent>();
services.AddSingleton(bus);
services.AutoRegisterHandlersFromThisAssembly();
Надеюсь, кто-то может помочь, это сводит меня с ума ...
p.s .: проблема присутствует также при передаче isCentralized: true в subscription.StoreInMongoDb ().
РЕДАКТИРОВАТЬ 1. Я добавил ведение журнала консоли, и вы можете увидеть это странное поведение: https://postimg.org/image/czz5lchp9/
Первая команда отправлена успешно. Это обрабатывается сагой, и событие запускает обработчик в консольном приложении «A». Ребус говорит, что вторая команда не была отправлена никаким обработчикам, но на самом деле она была обработана сагой (я следил за кодом при отладке), и событие было обработано обработчиком в приложении «B», а не «A» ... почему? ; (
РЕДАКТИРОВАТЬ 2: Я отлаживаю исходный код Ребуса. Я заметил, что в ThreadPoolWorker.cs метод TryAsyncReceive
async void TryAsyncReceive(CancellationToken token, IDisposable parallelOperation)
{
try
{
using (parallelOperation)
using (var context = new DefaultTransactionContext())
{
var transportMessage = await ReceiveTransportMessage(token, context);
if (transportMessage == null)
{
context.Dispose();
// no need for another thread to rush in and discover that there is no message
//parallelOperation.Dispose();
_backoffStrategy.WaitNoMessage();
return;
}
_backoffStrategy.Reset();
await ProcessMessage(context, transportMessage);
}
}
после публикации TCreatedEvent приложением «B» в приложении «A» код достигает await ProcessMessage (context, transportMessage), где transportMessage является фактическим событием. Эта строка кода не достигается в процессе приложения «B». Похоже, что первый получатель сообщения удаляет его из очереди MSMQ. Как я уже сказал, я новичок в Rebus и автобусах в целом, но если это поведение соответствует дизайну, я очень озадачен ... как несколько шин в нескольких процессах могут прослушивать одну и ту же очередь ???