акка-потоки с акка-кластером

Мой акка-стрим продолжается. Я хочу интегрировать свое приложение akka-streams с akka -cluster и DistributedPubSubMediator.

Добавить поддержку публикации довольно просто, но с частью подписки у меня проблемы.

Для справки: подписчик указан в Типичный пример:

class ChatClient(name: String) extends Actor {
  val mediator = DistributedPubSub(context.system).mediator
  mediator ! Subscribe("some topic", self)

  def receive = {
    case ChatClient.Message(from, text) =>
      ...process message...
  }
}

Мой вопрос в том, как мне интегрировать этого актера с моим потоком и как обеспечить получение сообщений публикации при отсутствии противодавления потока?

Я пытаюсь реализовать модель pubsub, в которой один поток может публиковать сообщение, а другой поток будет использовать его (если подписан).


person Will I Am    schedule 03.02.2016    source источник
comment
Я бы хотел закрыть этот вопрос, потому что я открываю другой вопрос, более конкретный.   -  person Will I Am    schedule 05.02.2016


Ответы (3)


Вы, вероятно, захотите, чтобы ваш Актер расширил ActorPublisher. Затем вы можете создать из него источник и интегрировать его в свой поток.

См. Документацию по ActorPublisher здесь: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html

person jamesmulcahy    schedule 03.02.2016
comment
Я не понимаю, что вы обычно создаете одного актера на подписку. Итак, когда я обрабатываю сообщения подписки, сам Flow становится фабрикой акторов. Поток будет создавать по одному субъекту для каждой желаемой подписки, и эти субъекты будут привязаны к жизненному циклу потока (если я не получу запрос на отмену подписки). Перечитаю документацию, может лампочка погаснет. - person Will I Am; 03.02.2016
comment
Я понял, как создать актора для каждого потока, продолжая работать с двумя источниками. Спасибо за предложение, буду обновлять. - person Will I Am; 03.02.2016

По этой теме есть очень хорошая презентация на YouTube. Интеграция с кластером близится к завершению, но в целом разговор достаточно информативный.

person Ramón J Romero y Vigil    schedule 03.02.2016
comment
Спасибо. Однако я все еще не понимаю, как заставить ActorPublisher писать в мой http-поток. У меня есть источник актераPublisher (src), как в Flow [A] .to (Sink.ignore) .runWith (src), но я думаю, что мне нужно отправить его в мой приемник соединения akka-http вместо Sink.ignore. Но я не уверен, как получить эту раковину и правильно ли это делать. Все работает, за исключением того, что не могу разобраться с инъекционной частью. - person Will I Am; 04.02.2016

Остальные ответы устарели: они предлагают использовать _1 _, который устарел с версии 2.5.0.

Для тех, кто интересуется текущим подходом, Колин Брек написал в своем блоге отличную серию статей об интеграции Akka Streams и актеров Akka. По ходу сериала Брек детализирует систему, которая начинается с Akka Streams и простых актеров, а затем включает Akka Cluster и Akka Persistence. Первый пост в этой серии находится здесь (часть обработки распределенного потока находится в части 3).

person Jeffrey Chung    schedule 16.11.2017