Я пытаюсь настроить Saga, которая работает следующим образом:
- Saga получает сообщение о заказе на доставку. Этот заказ на доставку имеет свойство RouteId, которое я могу использовать для сопоставления заказов на доставку для одного и того же «грузовика».
- Эти заказы на доставку создаются другой системой, которая может использовать пакетный процесс для отправки этих заказов. Но эта система не может группировать заказы на доставку по одному и тому же адресу.
- Через какое-то количество секунд я отправил еще одно сообщение только с этим RouteId. Мне нужно захватить все заказы на доставку полученного RouteId, сгруппировать их по адресу, перевести их в другой объект и отправить в другую веб-службу.
Но я столкнулся с двумя проблемами:
- Если я отправляю два сообщения «одновременно» первому обработчику, каждое из них приходит, и даже со свойствами, которые коррелируют эти сообщения, свойство IsNew не изменяется после обработки первого сообщения.
- Во втором обработчике я хочу получить доступ ко всем данным, относящимся к этой саге, но не могу, потому что данные кажутся данными, поскольку они были отложены в редакции этих сообщений.
Соответствующий код:
Конфигурация шины для саги
Bus = Configure.With(Activator)
.Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
.Logging(l => l.ColoredConsole())
.Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
.Sagas(s => {
s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
if (enforceExclusiveAccess)
{
s.EnforceExclusiveAccess();
}
})
.Options(o =>
{
if (maxDegreeOfParallelism > 0)
{
o.SetMaxParallelism(maxDegreeOfParallelism);
}
if (maxNumberOfWorkers > 0)
{
o.SetNumberOfWorkers(maxNumberOfWorkers);
}
})
.Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
.Start();
Класс СагаДата:
public class RouteListSagaData : ISagaData
{
public Guid Id { get; set; }
public int Revision { get; set; }
private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();
public long RoutePlanId { get; set; }
public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
public bool SentToLisa { get; set; }
public void AddShippingActivity(LisaShippingActivity shippingActivity)
{
if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
{
_shippingActivities.Add(shippingActivity);
}
}
public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}
Метод коррелатемессажес
protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}
Дескриптор сообщения, которое должно инициировать сагу и отправить сообщение DefferedMessage, если сага является новой.
public async Task Handle(ShippingOrder message)
{
try
{
var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);
if (Data.ShippingActivities.Contains(lisaActivity))
return;
Data.RoutePlanId = message.RoutePlanId;
Data.AddShippingActivity(lisaActivity);
var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);
if (IsNew)
{
await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
}
}
catch (Exception err)
{
Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
throw;
}
}
И, наконец, обработчик отложенного сообщения:
public Task Handle(VerifyRouteListIsComplete message)
{
try
{
if (!Data.SentToLisa)
{
var lisaData = Data.GroupShippingActivitiesToLisaActivities();
_lisaService.SyncRouteList(lisaData).Wait();
Data.SentToLisa = true;
}
MarkAsComplete();
return Task.CompletedTask;
}
catch (Exception err)
{
Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
_serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
MarkAsUnchanged();
return Task.CompletedTask;
}
}
Любая помощь приветствуется!