Axon и CompletableFuture

Я столкнулся с проблемами при попытке использовать CompletableFuture с Axon. Например:

CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            log.info("Start processing target: {}", target.toString());
            return new Event();

        }, threadPool);

future.thenAcceptAsync(event -> {
            log.info("Send Event");
            AggregateLifecycle.apply(event);
}, currentExecutor);

в thenAcceptAsync - AggregateLifecycle.apply (событие) имеет неожиданное поведение. Некоторые из моих обработчиков @EventSourcingHandler начинают обрабатывать событие дважды. Кто-нибудь знает, как это исправить?

Я читал документы, и все, что у меня есть, это:

В большинстве случаев DefaultUnitOfWork предоставит вам необходимую функциональность. Ожидается, что обработка будет происходить в одном потоке.

Итак, кажется, я должен как-то использовать методы CurrentUnitOfWork.get / set, но все еще не могу понять Axon API.


person Benjamin    schedule 22.10.2018    source источник
comment
Из этого небольшого фрагмента кода похоже, что вы генерируете и применяете события асинхронно внутри своей совокупности, верно? Я пытаюсь придумать вариант использования, в котором это можно было бы использовать, и не могу его придумать. Я не уверен, что это поддерживается. При запуске AggregateLifecycle.apply () событие должно сохраняться в хранилище событий. Затем обработчики событий обработают их синхронно или асинхронно, в зависимости от того, используете ли вы процессоры событий подписки или отслеживания.   -  person Mzzl    schedule 23.10.2018
comment
почему это невозможно? Я хочу сделать что-то отдельно в threadpool, а затем отправить событие, чтобы продолжить работу над результатом. В чем проблема?   -  person Benjamin    schedule 23.10.2018


Ответы (1)


Вы не должны apply() события асинхронно. Метод apply() вызовет внутренние методы агрегата @EventSourcingHandler и запланирует событие для публикации, когда единица работы завершится (успешно). Как Axon работает с Единицей работы (которая координирует действия при вызове отдельного обработчика сообщений), метод apply () должен вызываться в потоке, который управляет этой Единицей работы.

Если вы хотите асинхронную публикацию событий, используйте шину событий, которая использует асинхронный транспорт, и используйте процессоры отслеживания.

person Allard    schedule 23.10.2018
comment
хорошо, есть ли пример использования такого подхода?) - person Benjamin; 23.10.2018
comment
Примеры большинства функций в демонстрационных проектах на github AxonFramework: github.com/AxonFramework - person Mzzl; 23.10.2018
comment
Аллард, не могли бы вы объяснить мне, как настроить шину событий, которая использует асинхронный транспорт и обрабатывает события из diff. темы правильно? - person Benjamin; 23.10.2018
comment
когда я использую config.usingTrackingProcessors (); я потерял все данные в полях в классе Aggregate - person Benjamin; 23.10.2018
comment
Ваш агрегированный класс не должен иметь ничего общего с процессорами отслеживания. Правильно ли вы разделяете модели? Взгляните на репозиторий github.com/AxonIQ/axon-quick-start. Он содержит некоторые задания по изучению основ Axon. - person Allard; 24.10.2018