Широковещательное сообщение для маршрутов в ClusterRouter в Akka

Я пытаюсь передать сообщение всем маршрутам в конфигурации ClusterRouter. Я уже пробовал два варианта. Вот этот:

 val workerRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(AdaptiveLoadBalancingRouter(metrics), ClusterRouterSettings(
      totalInstances = 100, routeesPath = "/user/slave",
      allowLocalRoutees = true, useRole = None))), name = "slaveRouter")

  context.system.scheduler.schedule(2 seconds, 5 seconds, workerRouter, Broadcast(CapabilityRequest))

И этот:

 val broadcastRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(BroadcastRouter(Nil), ClusterRouterSettings(
      totalInstances = 100, routeesPath = "/user/slave",
      allowLocalRoutees = true, useRole = None))), name = "slaveRouter")

  context.system.scheduler.schedule(2 seconds, 5 seconds, broadcastRouter, CapabilityRequest)

Но для них обоих только один из slaves получает сообщение. Мысли?


Чтобы понять, почему я считаю, что первая попытка должна была сработать, нужно взглянуть на AdaptiveLoadBalancingRounter.scala в трейте AdaptiveLoadBalancingRouterLike при создании Route:

{
  case (sender, message) ⇒
    message match {
      case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
      case msg            ⇒ List(Destination(sender, getNext()))
    }
}

person Hugo Sereno Ferreira    schedule 20.05.2013    source источник
comment
Как я спросил в списке рассылки: можете ли вы представить доказательство того, что ваш кластер на самом деле имеет более одного члена, когда вы отправляете сообщения?   -  person Roland Kuhn    schedule 22.05.2013
comment
Я могу дать вам весь код, да, но опять же: (i) RoundRobinRouter отправляет сообщения всем членам и (ii) ручная трансляция, когда я перебираю всех участников сети, кажется, работает.   -  person Hugo Sereno Ferreira    schedule 22.05.2013


Ответы (1)


В вашем первом примере вы используете маршрутизатор, который будет отправлять только один маршрут. Из документов, которые я прочитал, этот маршрутизатор будет использовать метрики, доступные из разных узлов, чтобы выбрать узел, который, по-видимому, находится под наименьшим принуждением, и отправить сообщение маршруту, который находится на этом узле. Я думаю, что поведение, которое вы видите для этой настройки, ожидаемо.

Что касается вашего второго примера, я ничего не нашел в документах об использовании BraodcastRouter в кластерной среде, поэтому я не уверен, что этот подход поддерживается. Сказав это, я предполагаю, что создание BraodcastRouter с пустым списком маршрутов (Nil) является причиной поведения, которое вы видите. Я думаю, если вы измените это на BroadcastRouter(100), вы можете увидеть другое поведение. Но опять же, я не думаю (из-за отсутствия примера в документации), что использование BroadcastRouter поддерживается (и я могу ошибаться).

Можете ли вы немного подробнее объяснить свой вариант использования, чтобы я мог понять, зачем вам нужен маршрутизатор широковещательного типа для вашего кластера?

Изменить

FWIW, у меня все работает со следующим кодом. Во-первых, конфиг:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    log-remote-lifecycle-events = off
    netty {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    min-nr-of-members = 2
    seed-nodes = [
      "akka://[email protected]:2551", 
      "akka://[email protected]:2552"]

    auto-down = on
  }
}

Затем я запустил два узла (один на 2551, другой на 2552), используя следующий код:

object ClusterNode {

  def main(args: Array[String]): Unit = {

    // Override the configuration of the port 
    // when specified as program argument
    if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))


    // Create an Akka system
    val system = ActorSystem("ClusterSystem")
    val clusterListener = system.actorOf(Props(new Actor with ActorLogging {
      def receive = {
        case state: CurrentClusterState =>
          log.info("Current members: {}", state.members)
        case MemberJoined(member) =>
          log.info("Member joined: {}", member)
        case MemberUp(member) =>
          log.info("Member is Up: {}", member)
        case UnreachableMember(member) =>
          log.info("Member detected as unreachable: {}", member)
        case _: ClusterDomainEvent => // ignore

      }
    }), name = "clusterListener")

    Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])    
  }

}

class FooActor extends Actor{

  override def preStart = {
    println("Foo actor started on path: " + context.self.path)
  }

  def receive = {
    case msg => println(context.self.path + " received message: " + msg)
  }
}

Затем я запустил третий «узел», мой клиентский узел, используя следующий код:

object ClusterClient {
  def main(args: Array[String]) {
    val system = ActorSystem("ClusterSystem")

    Cluster(system) registerOnMemberUp{
      val router = system.actorOf(Props[FooActor].withRouter(
        ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector),
        ClusterRouterSettings(
        totalInstances = 20, maxInstancesPerNode = 10,
        allowLocalRoutees = false))),
        name = "fooRouter")  

     router ! Broadcast("bar")
    }
  }
}

Когда сообщение было отправлено, я увидел, что оно было получено на обеих виртуальных машинах узла сервера, по 10 участников на каждую виртуальную машину.

Отличие моего роутера от вашего в том, что я не указал локальные маршруты и заменил routeesPath на maxInstancesPerNode. Надеюсь, это поможет.

person cmbaxter    schedule 21.05.2013
comment
Интересно. Я просмотрел весь источник, и я также не вижу хорошего объяснения, почему это не работает. Вы видите, что перед отправкой широковещательного сообщения создается 100 ведомых устройств? - person cmbaxter; 21.05.2013
comment
Мне придется проработать ваш пример, так как в моем случае routeesPath необходим, чтобы разобраться, какие маршруты подходят. Я предполагаю, что вы используете разных актеров на разных виртуальных машинах, верно? - person Hugo Sereno Ferreira; 22.05.2013
comment
Да. Всего у меня было запущено 3 JVM (два сервисных узла, один клиентский узел), но все было локально для моего Mac. В моем примере экземпляры акторов не развертываются до тех пор, пока я не запущу клиентский код и не запущу маршрутизатор с поддержкой кластера. Я думаю, что все по-другому, когда вы сами запускаете экземпляры актера на каждом сервисном узле, когда этот сервисный узел запускается. - person cmbaxter; 22.05.2013
comment
Я тоже использую Mac, так что... Хммм... Какую версию Akka вы используете? 2.1.х или 2.2-Мх? - person Hugo Sereno Ferreira; 23.05.2013
comment
Хорошо, может быть, тот факт, что я использую промежуточные выпуски 2.2, объясняет это: P - person Hugo Sereno Ferreira; 23.05.2013