Как избежать параллелизма по статусу агрегатов с помощью Rebus в кластере серверов

У меня есть веб-сервис, который использует Rebus в качестве служебной шины. Ребус настроен так, как описано в этом посте. Веб-служба сбалансирована по нагрузке с кластером из двух серверов.

Эти сервисы предназначены для производственной среды, и каждая производственная машина отправляет команды для сохранения произведенных объемов и/или обновления своего состояния.
В BL я смоделировал совокупный корень для каждой машины, и он выполняет команды, выдаваемые настоящая машина. Чтобы сохранить правильный статус, Aggregate должен получать команды в той же последовательности, в которой они были отправлены, и, поскольку для этой машины нет параллелизма, в том же порядке они сохраняются на шине.

Например: машина XX отправляет команду 'добавить новую деталь в готовом виде', а затем команду 'установить остановку для технического обслуживания'. Выполняя эти команды в последовательности, вы должны иметь Aggregate XX в состоянии 'Stop', но при наличии нескольких серверных/рабочих ролей обе команды могут выполняться одновременно в одной и той же версии Совокупность. Это означает, что в зависимости от того, кто первым сохранит агрегат, у меня может быть агрегат XX с состоянием "Остановить" или "Производить детали"... это не одно и то же. .

Я ввел служебную шину, чтобы добавить масштабирование в виде количества масштабов машины и устойчивости (в случае сбоя сервера у меня будет только замедление обработки команд).

На самом деле я использую имя агрегата, например "topic" или "destinationAddress" с IAdvancedApi. , поэтому имя агрегата сохраняется в получателе транспорта. Затем я создал собственный класс транспорта, который:
1. не удаляет сообщения в процессе обработки, а переводит их в состояние InProgress.
2. для извлечения сообщений выбираются только те, которые находятся в получателе, у которого нет InProgress.

Я блуждаю: это лучший способ гарантировать, что шина выполняет команды для агрегата в той же последовательности, в которой они прибыли?


person ilcorvo    schedule 30.09.2016    source источник
comment
Не слишком ли много ты требуешь от Ребуса? т.е. пытаетесь превратить легкое решение для обмена сообщениями в мощное средство с высокой доступностью и гарантией заказов?   -  person guillaume31    schedule 03.10.2016


Ответы (3)


Решением будет какая-то блокировка вашего сводного корня, что должно произойти на уровне хранилища данных.

Например. используя оптимистическую блокировку (вероятно, реализованную с каким-то номером версии или чем-то подобным), вы будете уверены, что никогда случайно не перезапишете изменения другого узла.

Это позволит вашей совокупности либо

а) принять изменения в любом порядке (что обычно предпочтительнее — делает вашу систему более терпимой) или б) отклонить недопустимое изменение

Если агрегат отклоняет изменение, это может быть реализовано путем создания исключения. И затем в обработчике Rebus, который ловит это исключение, вы можете, например. await bus.Defer(TimeSpan.FromSeconds(5), theMessage), что приведет к повторной доставке через пять секунд.

person mookid8000    schedule 30.09.2016
comment
На данный момент я реализовал собственный транспорт: 1) При получении сообщения не удаляются, а устанавливаются в статус InProgress и удаляются только в контексте.OnCompleted/OnAborted. 2) при выборе сообщения для отправки я ищу сообщения, у которых нет ни одного получателя с тем же InProgress. - person ilcorvo; 30.09.2016
comment
Я думал о блокировке агрегата, но мне не нравится идея, что шина тратит время на попытки выполнить сообщения, которые он не может выполнить в это время. Вот почему я переместил семафор на транспортный уровень. - person ilcorvo; 05.10.2016

Вы никогда не должны полагаться на порядок сообщений в среде служебной шины/очереди/обмена сообщениями.

Когда вы действительно окажетесь в таком положении, возможно, вам придется переосмыслить свой дизайн. Во-первых, служебная шина, безусловно, не является хранилищем событий, и попытка использовать ее как таковую приведет к боли и страданиям :) --- не то, чтобы вы пытались это сделать, но я думал, что д туда кинуть.

Что касается вашего дизайна, для управления такого рода состоянием вы можете взглянуть на диспетчер процессов. Если вы не генерируете эти команды, то даже это не поможет.

Однако, учитывая ваш сценарий, кажется, что вызовы являются последовательными, но, возможно, это только ваш пример. В любом случае, как сказал mookid8000, вы либо хотите:

  • отменить недействительные изменения (с соответствующей обратной связью),
  • разрешить любой порядок сообщений, пока они действительны,
  • игнорировать сообщения вне очереди на более поздний срок.

Надеюсь, это поможет...

person Eben Roux    schedule 30.09.2016
comment
В моем сценарии вызовы определенного агрегата абсолютно последовательны, и я должен гарантировать, что они выполняются в том же порядке. - person ilcorvo; 30.09.2016
comment
Вероятно, самым простым было бы сохранить порядковый номер в сообщении (как упомянул mookid с номером версии). Тогда это вопрос игнорирования сообщения, которое не следует из зарегистрированного порядкового номера AR... - person Eben Roux; 30.09.2016
comment
Я сделал что-то подобное (пожалуйста, прочитайте комментарий к ответу Moonkid): в моем случае надежной последовательностью является позиция прибытия в очереди, и я не использую сообщения для этого AR, где для него выполняется другое. Я бродил, если есть более ориентированный на Ребус подход. - person ilcorvo; 30.09.2016
comment
Вы никогда не должны полагаться на порядок сообщений. Можете ли вы объяснить, почему вы делаете это общим утверждением? Некоторые системы, такие как RabbitMQ, гарантируют порядок в зависимости от вашей настройки. В спецификации протокола AMQP есть параграф об условиях, при которых гарантируется упорядочение. - person guillaume31; 03.10.2016
comment
@ guillaume31: Я думаю, что если коротко, то будет менее болезненно, если вы сможете уйти, не заказывая сообщения, поскольку заказанные сообщения являются своего рода узким местом. Что происходит, когда обработка сообщения не удалась? Все сообщения, следующие за этим сообщением, должны были бы ждать, так как они нуждались бы в упорядочении. Как насчет многопоточных сценариев? Довольно сложно обработать сообщение по порядку, и это несколько уменьшает любые выгоды, поскольку сообщение не может обрабатываться параллельно. Еще хуже в распределенном сценарии. - person Eben Roux; 03.10.2016
comment
@ guillaume31: При этом у меня есть проекции, работающие с моим хранилищем событий, где события обрабатываются последовательно/упорядоченно для каждой проекции. Итак, если у меня есть 3 прогноза, каждый из них будет запускаться в своем потоке, обрабатывая события последовательно для каждого прогноза. Короче говоря, упорядоченная обработка сообщений — это просто ситуация, которую я бы посоветовал избегать. - person Eben Roux; 03.10.2016
comment
Я вижу много случаев, когда требуется упорядочение, и наличие такой поддержки в вашей технологии шины сообщений — это хорошо. Кроме того, я не думаю, что вы продемонстрировали, что ОП не нуждается в заказе. Вы говорите о хранилище событий, а он говорит о командах. Прямо сейчас многие люди подхватят ваше первое предложение и остановятся на нем, приняв его за универсальную истину, но все гораздо сложнее. - person guillaume31; 03.10.2016
comment
Все сообщения, следующие за этим сообщением, должны были бы ждать, так как их нужно было заказать. - ждать до чего? Какое это имеет отношение к заказу? - person guillaume31; 03.10.2016
comment
Если у нас есть сообщение A, B, C, D и B, мы не можем продолжить обработку C и D. Или я что-то упустил? - person Eben Roux; 03.10.2016
comment
Вы имеете в виду, если C и D не имеют смысла без A и B? Тогда, конечно, вы не сможете продолжить обработку C и D, но вы можете отложить их и продолжить с E, F и так далее. Здесь нет ничего блокирующего, потребитель не остановится и не будет ждать. Ждать чего? - person guillaume31; 03.10.2016
comment
Тот факт, что сообщения должны быть в порядке, по-видимому, означает, что обработка не сможет продолжаться в случае сбоя одного из них. Но я действительно не могу ответить на этот вопрос. В любом случае, я не слишком заморачиваюсь. Если есть реальное требование, которое вы не можете обойти, то, как я уже упоминал в ответе, есть способы реализовать решение. - person Eben Roux; 03.10.2016
comment
В моем случае stop или производство штук имеют смысл в любом порядке (потому что производство заканчивается любой остановкой). Таким образом, не будет исключения проверки. Если команда выдает ошибку (не для проверки, а для необработанного исключения), вы можете пройти через другие, но тогда нам нужны компенсационные действия для выравнивания постоянства. - person ilcorvo; 05.10.2016

"точно такая же последовательность, как они были сохранены в автобусе"

Просто почему?

Будете ли вы полагаться на журналы вашего HTTP-сервера, чтобы узнать, какая команда на самом деле достигла агрегата первой? Нет, потому что это совершенно ненадежно, как и с хотя бы одной гарантией доставки, и это тоже не имеет значения.

Именно ваше хранилище событий и/или нормальное состояние сохраняемости должно быть источником истины, когда дело доходит до знания последовательности событий. Порядок команд не должен иметь большого значения.

Предполагая оптимистичный параллелизм, если агрегату не разрешен переход от A к C, тогда он должен охранять этот инвариант, и когда команда TransitionToStateC поразит его в состоянии A, он будет просто отклонен.

С другой стороны, если A->C->B переходов действительны, и это порядок, полученный вашим агрегатом, то это то, что произошло с точки зрения домена. На самом деле не должно иметь значения, какая команда была опубликована на шине первой, точно так же, как не имеет значения, какой пользователь первым выполнил команду из пользовательского интерфейса.

«В моем сценарии вызовы для определенного агрегата абсолютно последовательны, и я должен гарантировать, что они выполняются в том же порядке».

Почему тогда вы выполняете их асинхронно и потенциально одновременно, публикуя на шине? В основном вы говорите, что вызовы являются последовательными и не могут обрабатываться одновременно. Это означает, что все должно быть синхронно, потому что от параллелизма нет никакой потенциальной выгоды.

Почему:

executeAsync(command1)
executeAsync(command2)
executeAsync(command3)

Когда вы хотите:

execute(command1)
execute(command2)
execute(command3)

У вас должно быть одно командное сообщение, и обработчик этого сообщения выполняет несколько команд для совокупности. Опять же, в этом случае я бы просто создал одну операцию на агрегате, которая выполняет все переходы.

person plalx    schedule 30.09.2016
comment
Я понимаю, что я был слишком общим в описании варианта использования. Я обновлю описание. - person ilcorvo; 05.10.2016
comment
@ilcorvo Почему бы вам просто не моделировать актеров вместо того, чтобы полагаться на шину событий? Каждый агрегат представляет собой однопоточный актор с почтовым ящиком (синхронизированной очередью сообщений). Кажется, это легко решает вашу проблему. Нет смысла иметь конкурирующих потребителей, если вам нужны сообщения по порядку. - person plalx; 06.10.2016
comment
Мне нравится эта идея, но как вы можете согласовать это с отказоустойчивостью среды в случае сбоя сервера? Имея 2 сервера, потребляющих сообщения из одной и той же очереди, я могу заявить, что в случае сбоя 1 сервера система может полагаться на другой. - person ilcorvo; 07.10.2016
comment
@ilcorvo Я не уверен, что то, чего вы пытаетесь достичь, возможно даже с таким уровнем отказоустойчивости. В распределенных системах вы можете быть либо непротиворечивыми, либо доступными, но не можете быть и тем, и другим одновременно. - person plalx; 07.10.2016