Как делать запросы к коллекции акторов

У меня есть акторная система, которая на данный момент принимает команды/сообщения. Состояние этих акторов сохраняется Akka.Persistance. Теперь мы хотим построить систему запросов для этой системы акторов. По сути, наша проблема заключается в том, что мы хотим получить совокупность/список всех состояний этих конкретных акторов. Хотя я строго не подписываюсь на шаблон CQRS, я думаю, что это может быть аккуратный способ сделать это.

Мои первоначальные мысли состояли в том, чтобы иметь актора для запросов, который содержит как часть своего состояния совокупность состояний других акторов, выполняющих «запись данных». И для этого этот актор будет подписываться на акторов, которые его интересуют, и эти акторы будут просто отправлять актеру запроса свои состояния, когда они претерпевают какое-то изменение состояния. Это способ сделать это? Есть лучший способ это сделать?


person Lutando    schedule 26.01.2017    source источник


Ответы (2)


Моя рекомендация для реализации этого типа шаблона — использовать комбинацию обмена сообщениями pub-sub и push-and-pull для ваших актеров.

Для каждого «агрегата» этот актор должен иметь возможность подписаться на события от отдельных дочерних акторов, которые вы хотите запросить. Всякий раз, когда состояние дочернего элемента изменяется, сообщение отправляется во все подписанные агрегаты, и состояние каждого агрегата обновляется автоматически.

Когда новый агрегат подключается к сети и ему необходимо восстановить состояние, которое он пропустил (до того, как оно существовало), он должен иметь возможность получить текущее состояние от каждого дочернего элемента и использовать его для построения своего текущего состояния, используя добавочные обновления от дочерних элементов для сохранения своего состояния. совокупный вид детского состояния непротиворечив.

Это шаблон, который я использую для такого рода работы, и он хорошо работает локально из коробки. По сети вам, возможно, придется обеспечить гарантии доставки, и это, как правило, легко сделать. Подробнее о том, как это сделать, можно прочитать здесь: https://petabridge.com/blog/akkadotnet-at-least-once-message-delivery/

person Aaronontheweb    schedule 27.01.2017
comment
Большое спасибо! пойдет по этому пути. - person Lutando; 27.01.2017

Некоторые серверные части Akka.Persistence (т. е. работающие с SQL) также реализуют что-то известное как Akka.Persistence.Query. Это позволяет вам подписаться на поток создаваемых событий и использовать его в качестве источника для Akka.Streams. семантика.

Если вы используете журналы SQL, вам понадобятся пакеты Akka.Persistence.Query.Sql и Akka.Streams. Оттуда вы можете создать живой (то есть постоянно обновляемый) источник событий для конкретного актера и использовать его для любых операций, которые вам нравятся, т.е. распечатать их:

using (var system = ActorSystem.Create("system"))
using (var materializer = system.Materializer())
{
    var queries = Sys.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier)

    queries.EventsByPersistenceId("<persistence-id>", 0, long.MaxValue)
        .Select(envelope => envelope.Event)
        .RunForEach(e => Console.WriteLine(e), materializer);
}
person Bartosz Sypytkowski    schedule 28.01.2017
comment
У меня очень странное поведение: метод RunForEach является ожидаемым, и когда я жду его, он выводит события в консоль, но метод не завершается и создает своего рода тупиковую блокировку! (Я использую Postgresql в качестве своей БД) - person rubiktubik; 02.01.2018
comment
Какой запрос вы используете? EventsByPersistenceId — это живой запрос. Это означает, что он не завершится, пока не будет достигнуто максимальное значение sequenceNr. Если вы хотите запросить только сохраненные данные, используйте CurrentEventsByPersistenceId (или другие методы, начинающиеся с префикса Current...). - person Bartosz Sypytkowski; 02.01.2018
comment
Использование CurrentEventsByPersistenceId вместо EventsByPersistenceId было решением! Благодарю вас! - person rubiktubik; 03.01.2018