Rebus Sagas, Revisions и DeferredMessages

Я пытаюсь настроить Saga, которая работает следующим образом:

  1. Saga получает сообщение о заказе на доставку. Этот заказ на доставку имеет свойство RouteId, которое я могу использовать для сопоставления заказов на доставку для одного и того же «грузовика».
  2. Эти заказы на доставку создаются другой системой, которая может использовать пакетный процесс для отправки этих заказов. Но эта система не может группировать заказы на доставку по одному и тому же адресу.
  3. Через какое-то количество секунд я отправил еще одно сообщение только с этим RouteId. Мне нужно захватить все заказы на доставку полученного RouteId, сгруппировать их по адресу, перевести их в другой объект и отправить в другую веб-службу.

Но я столкнулся с двумя проблемами:

  1. Если я отправляю два сообщения «одновременно» первому обработчику, каждое из них приходит, и даже со свойствами, которые коррелируют эти сообщения, свойство IsNew не изменяется после обработки первого сообщения.
  2. Во втором обработчике я хочу получить доступ ко всем данным, относящимся к этой саге, но не могу, потому что данные кажутся данными, поскольку они были отложены в редакции этих сообщений.

Соответствующий код:

Конфигурация шины для саги

Bus = Configure.With(Activator)
   .Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
   .Logging(l => l.ColoredConsole())
   .Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
   .Sagas(s => {
       s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
          if (enforceExclusiveAccess)
          {
              s.EnforceExclusiveAccess();
          }
       })
   .Options(o =>
       {
         if (maxDegreeOfParallelism > 0)
         {
            o.SetMaxParallelism(maxDegreeOfParallelism);
         }
         if (maxNumberOfWorkers > 0)
         {
            o.SetNumberOfWorkers(maxNumberOfWorkers);
         }
      })
   .Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
   .Start();

Класс СагаДата:

public class RouteListSagaData : ISagaData
{
    public Guid Id { get; set; }
    public int Revision { get; set; }

    private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();

    public long RoutePlanId { get; set; }

    public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
    public bool SentToLisa { get; set; }

    public void AddShippingActivity(LisaShippingActivity shippingActivity)
    {
        if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
        {
            _shippingActivities.Add(shippingActivity);
        }
    }

    public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}

Метод коррелатемессажес

protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
    config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
    config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}

Дескриптор сообщения, которое должно инициировать сагу и отправить сообщение DefferedMessage, если сага является новой.

public async Task Handle(ShippingOrder message)
{
  try
  {
    var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);

    if (Data.ShippingActivities.Contains(lisaActivity))
      return;

    Data.RoutePlanId = message.RoutePlanId;
    Data.AddShippingActivity(lisaActivity);
    var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);

    if (IsNew)
    {
      await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
    }
 }
 catch (Exception err)
 {
   Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
   throw;
 }
}

И, наконец, обработчик отложенного сообщения:

public Task Handle(VerifyRouteListIsComplete message)
{
  try
  {
    if (!Data.SentToLisa)
    {
      var lisaData = Data.GroupShippingActivitiesToLisaActivities();

      _lisaService.SyncRouteList(lisaData).Wait();

      Data.SentToLisa = true;
    }
    MarkAsComplete();
    return Task.CompletedTask;
  }
  catch (Exception err)
  {
    Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
    _serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
    MarkAsUnchanged();
    return Task.CompletedTask;
  }
}

Любая помощь приветствуется!


person Gerson Dias    schedule 21.05.2018    source источник


Ответы (1)


Я не уверен, что правильно понимаю симптомы, которые вы испытываете.

Если я отправляю два сообщения «одновременно» первому обработчику, каждое из них приходит, и даже со свойствами, которые коррелируют эти сообщения, свойство IsNew не изменяется после обработки первого сообщения.

Если вызывается EnforceExclusiveAccess, я ожидаю, что сообщения будут обрабатываться последовательно, первое с IsNew == true, а второе с IsNew == false.

Если нет, я бы ожидал, что оба сообщения будут обрабатываться параллельно с IsNew == true, но затем — когда данные sage вставлены — я бы ожидал, что одно из них будет выполнено успешно, а другое — с ошибкой с ConcurrencyException.

После ConcurrencyException сообщение будет обработано снова, на этот раз с IsNew == false.

Разве это не то, что вы испытываете?

Во втором обработчике я хочу получить доступ ко всем данным, относящимся к этой саге, но не могу, потому что данные кажутся данными, поскольку они были отложены в редакции этих сообщений.

Вы говорите, что данные в данных саги, похоже, находятся в том же состоянии, в котором они находились, когда сообщение VerifyRouteListIsComplete было отложено?

Это звучит очень странно, а также довольно маловероятно ???? не могли бы вы попробовать еще раз и посмотреть, так ли это на самом деле?


ОБНОВЛЕНИЕ: я выяснил, почему вы испытываете такое странное поведение: вы случайно настроили свой экземпляр обработчика саги для повторного использования в сообщениях.

Вы сделали это, зарегистрировав его следующим образом (ВНИМАНИЕ: не делайте этого!):

_sagaHandler = new ShippingOrderSagaHandler(_subscriber);

_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);

где метод Subscribe затем вызывает BuiltinHandlerActivator (ВНИМАНИЕ: не делайте этого!):

activator.Register(() => handlerInstance);

Причина, по которой это плохо (особенно для обработчика саги), заключается в том, что сам экземпляр обработчика имеет состояние — он имеет свойство Data, содержащее текущее состояние процесса, а также свойство IsNew.

Что вы должны делать ВСЕГДА, так это гарантировать, что новый экземпляр обработчика создается каждый раз, когда приходит сообщение — ваш код должен быть изменен на что-то вроде этого:

_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();

что можно сделать, если реализация Subscribe изменится на это:

public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
    _activator.Register((bus, context) => getHandler());
    await _activator.Bus.Subscribe<T>();
}

Это решит вашу проблему с эксклюзивным доступом :)

Есть еще одна проблема с вашим кодом: у вас есть потенциальное состояние гонки между регистрацией вашего обработчика и запуском экземпляра шины абонента, потому что теоретически вы можете потерпеть неудачу и начать получать сообщения между запуском шины и регистрацией вашего обработчика.

Вы должны изменить свой код, чтобы убедиться, что все обработчики зарегистрированы до того, как вы запустите шину (и, таким образом, начнете получать сообщения).

person mookid8000    schedule 21.05.2018
comment
Я загрузил репозиторий git по адресу github.com/GersonDias/RebusSagaConcurrencyStackOverflow, который воспроизводит мою архитектуру и ошибку, которую я сталкиваюсь. Если вы запустите проект Rebus.Publisher, он опубликует 4 сообщения в очередь. После этого запускается проект Rebus.Subscriber, и вы увидите в базе данных 4 отложенных сообщения, кроме того, что код для отсрочки сообщения находится внутри if(IsNew). Я также вызвал EnforceExclusiveAccess и не заметил исключения параллелизма. Можете ли вы попытаться взглянуть на этот код, чтобы помочь мне понять, что я сделал неправильно? Я ожидал того же, что и ты - person Gerson Dias; 22.05.2018
comment
но, симптомы именно такие, сообщения обрабатываются не последовательно, а сага вставляется/обновляется почти корректно (не могу быть уверен, но чувствую, что что-то напутал, особенно с коллекциями и правильной корреляцией на основе свойств сообщения). - person Gerson Dias; 22.05.2018
comment
Должна ли проблема заключаться в том, что я отправляю много сообщений типа, определенного в интерфейсе IAmInitiatedBy? - person Gerson Dias; 24.05.2018
comment
Нет, это не должно иметь значения. Если вы можете воспроизвести проблему, например. в модульном тесте или небольшом консольном приложении, я буду рад отладить его для вас. - person mookid8000; 24.05.2018
comment
Большое спасибо, @mookid8000! В этом репозитории github.com/GersonDias/RebusSagaConcurrencyStackOverflow вы можете увидеть проблему... Один тест, который Я использую этот код, чтобы запустить консольное приложение Rebus.Publisher для отправки сообщений, и они запускают проект Rebus.Subscriber. Вы увидите в базе данных 4 отложенных сообщения, кроме if (IsNew) перед вызовом для отправки сообщения... - person Gerson Dias; 25.05.2018