Рекомендации по предотвращению дублирования событий AggregateCreated

У меня есть следующий (Axon) агрегат:

@Aggregate
@NoArgsConstructor
public class Car{
  @AggregateIdentifier
  private String id;

  @CommandHandler
  public Car(CreateCar command){
    apply( new CarCreated(command.getId()) );
  }

  @EventSourcingHandler
  public void carCreated(CarCreated event) {
    this.id = event.getId();
  }

}

И я могу создать машину, отправив команду CreateCar с определенным идентификатором, что вызовет событие CarCreated. Это великолепно.

Однако, если я отправлю другую команду CreateCar с тем же идентификатором, команда не может быть проверена агрегатом (то, что данный идентификатор уже существует). Впоследствии он просто вызовет новое событие CarCreated. Что является ложью.

Как лучше всего убедиться, что команда CreateCar не работает, если автомобиль уже существует?

Естественно, я мог бы сначала проверить репозиторий, но это не предотвратит состояние гонки...


person dstibbe    schedule 07.11.2018    source источник
comment
использовать UUID или сгенерировать идентификатор в базе данных   -  person giorgi dvalishvili    schedule 07.11.2018
comment
и как это предотвращает два события CarCreated?   -  person dstibbe    schedule 07.11.2018
comment
Одно и то же событие создания с тем же агрегатным идентификатором нельзя сохранить в хранилище событий, поэтому эта транзакция должна завершиться ошибкой на уровне базы данных и выполнить откат. Я подозреваю, что вы используете UUID для совокупного идентификатора и на самом деле хотите, чтобы агрегаты также имели уникальное ограничение для другого идентификатора. В этом случае, что касается аксона, вы создаете новый уникальный агрегат, который просто имеет идентичную полезную нагрузку другому. Вам придется решить это с помощью блокировки и чтения таблицы запросов.   -  person Mzzl    schedule 09.11.2018
comment
Обычно это обрабатывается с помощью оптимистического параллелизма. При отправке события в вашу технологию хранения вы должны отправить индекс ожидаемого события вместе с событием. Для созданного вами события это будет 0 (для систем на основе 0). Надеюсь, ваша технология хранения поддерживает это и проверит, что событие с индексом 0 еще не было сохранено, и в противном случае произойдет сбой. Вы получаете текущий индекс событий от гидратации вашего агрегата. Видите ли, когда вы получаете события для своего агрегата (перед обработкой команды), вы просто подсчитываете количество событий и используете это количество как индекс ожидаемого события.   -  person Noel Widmer    schedule 12.11.2018


Ответы (2)


Однако, если я отправлю другую команду CreateCar с тем же идентификатором, команда не может быть проверена агрегатом (то, что данный идентификатор уже существует). Впоследствии он просто запустит новое событие CarCreated. Что является ложью.

Axon на самом деле позаботится об этом за вас. Когда агрегат публикует событие, оно не сразу публикуется для других компонентов. Он размещается в единице работы, ожидая завершения выполнения обработчика. После выполнения обработчика вызывается ряд обработчиков «подготовки фиксации». Один из них хранит агрегат (который не работает при использовании источника событий), другой — это публикация событий (в рамках транзакции).

В зависимости от того, используете ли вы Event Sourcing или нет, либо добавление экземпляра Aggregate в постоянное хранилище завершится ошибкой (повторяющийся ключ), либо публикация события создания не удастся (повторяющийся агрегатный идентификатор + порядковый номер).

person Allard    schedule 12.11.2018
comment
Надеюсь, вы скоро реализуете github.com/AxonFramework/AxonFramework/issues/653: п - person dstibbe; 12.02.2019

Как лучше всего убедиться, что команда CreateCar не работает, если автомобиль уже существует? Естественно, я мог бы сначала проверить репозиторий, но это не предотвратит состояние гонки...

Магии нет.

Если вы собираетесь избежать недопустимых записей, вам нужно либо установить блокировку хранилища данных, либо вам нужно хранилище данных с семантикой compare and swap.

Блокировка гарантирует отсутствие конфликтующих обновлений между чтением данных в хранилище и последующей записью.

lock = lock_for_id id
lock.acquire
Try:
    Option[Car] root = repository.load id
    switch root {

        case None:
            Car car = createCar ...
            repository.store car  

        case Some(car):
            // deal with the fact that the car has already been created   

    }
Finally:
    lock.release

Вы хотели бы иметь блокировку для каждого агрегата, но создание блокировок связано с теми же специфическими условиями, что и создание агрегатов. Таким образом, вы, скорее всего, получите что-то вроде грубой блокировки для ограничения доступа к операция.

Благодаря сравнению и обмену вы перемещаете управление конфликтами в сторону хранилища данных. Вместо отправки в магазин PUT вы отправляете условный PUT.

    Option[Car] root = repository.load id
    switch root {

        case None:
            Car car = createCar ...
            repository.replace car None

        case Some(car):
            // deal with the fact that the car has already been created   

    }

Блокировки нам больше не нужны, потому что мы описываем предусловие именно для магазина (например, If-None-Match: *), которое должно быть удовлетворено.

Семантика сравнения и замены обычно поддерживается хранилищами событий; «добавление» новых событий в поток выполняется путем создания запроса, определяющего ожидаемую позицию хвостового указателя, со специально закодированными значениями для идентификации случаев, когда ожидается создание потока (например, хранилище событий поддерживает ExpectedVersion.NoStream семантика).

person VoiceOfUnreason    schedule 07.11.2018