программа зависает при использовании нескольких фьючерсов с несколькими удаленными акторами

Я запускаю двух удаленных актеров на одном хосте, которые просто повторяют все, что им отправляется. Затем я создаю другого актера, который отправляет некоторое количество сообщений (используя !! ) обоим актерам, и сохраняю объекты List of Future, содержащие ответы от этих актеров. Затем я перебираю этот список, извлекая результат каждого Future. Проблема в том, что в большинстве случаев некоторые варианты будущего никогда не возвращаются, даже если актор утверждает, что отправил ответ. Проблема возникает случайным образом, иногда она проходит через весь список, но чаще всего она застревает в каком-то месте и висит на неопределенный срок.

Вот некоторый код, который создает проблему на моей машине:

Раковина.скала:

import scala.actors.Actor
import scala.actors.Actor._
import scala.actors.Exit
import scala.actors.remote.RemoteActor
import scala.actors.remote.RemoteActor._

object Sink {
  def main(args: Array[String]): Unit = {
     new RemoteSink("node03-0",43001).start()
     new RemoteSink("node03-1",43001).start()
   }
}
class RemoteSink(name: String, port: Int) extends Actor
{
 def act() {
    println(name+" starts")
    trapExit=true
    alive(port)
    register(Symbol(name),self)

    loop {
        react {
            case Exit(from,reason) =>{
                    exit()
            }
            case msg => reply{
                    println(name+" sending reply to: "+msg)
                    msg+" back at you from "+name
                }
        }
    }
 }
}

Источник.скала:

import scala.actors.Actor
import scala.actors.Actor._
import scala.actors.remote.Node;
import scala.actors.remote.RemoteActor
import scala.actors.remote.RemoteActor._

object Source {
    def main(args: Array[String]):Unit = {
        val peer = Node("127.0.0.1", 43001)
        val source = new RemoteSource(peer)
        source.start()
    }
}
class RemoteSource(peer: Node) extends Actor
{
    def act() {
        trapExit=true
        alive(43001)
        register(Symbol("source"),self)

        val sinks = List(select(peer,Symbol("node03-0"))
                                   ,select(peer,Symbol("node03-1"))
                                )
        sinks.foreach(link)

        val futures = for(sink <- sinks; i <- 0 to 20) yield    sink !! "hello "+i
        futures.foreach( f => println(f()))

        exit()
    }
}

Что я делаю неправильно?


person Kevin    schedule 28.07.2010    source источник
comment
Я также пытался использовать разные порты для каждого актера, но получил тот же результат.   -  person Kevin    schedule 28.07.2010


Ответы (2)


Я предполагаю, что ваша проблема связана с этой строкой:

futures.foreach( f => println(f()))

в котором вы перебираете все свои фьючерсы и блокируете их по очереди, ожидая результата. Блокировка на фьючерсах, как правило, плохая идея, и ее следует избегать. Вместо этого вы хотите указать действие, которое нужно выполнить, когда будет доступен результат будущего. Попробуй это:

futures.foreach(f => f.foreach(r => println(r)))

Вот альтернативный способ сказать это с помощью for comprehension:

for (future <- futures; result <- future) { println(result) }

Эта запись в блоге — отличный учебник по проблеме блокировка на фьючерсах и как монадические фьючерсы преодолевают ее.

person Tom Crockett    schedule 28.07.2010
comment
Спасибо за ваш ответ, я думаю, что теперь я лучше понимаю проблему. Однако этот фрагмент кода, который вы предоставили, также ведет себя странно. Он обрабатывает первое будущее, а затем завершает программу без создания исключения. Я также пытался использовать Futures.awaitAll(10000,futures), но даже через 10 секунд результаты все еще отсутствуют. - person Kevin; 29.07.2010
comment
Он завершается, потому что вы вызываете exit() после цикла. Цикл немедленно вернется с моим кодом, потому что он больше не блокирует, поэтому вы больше не хотите просто выходить оттуда. - person Tom Crockett; 29.07.2010
comment
нет, он завершается до этого, я добавил операторы печати и циклы и много других вещей для отладки. Я понимаю, что этот цикл не будет блокироваться, но после этой строки ничего не обрабатывается. Есть идеи, почему Futures.awaitAll не работает? Такое впечатление, что он был написан специально для такого рода задач. - person Kevin; 29.07.2010

Я тоже видел подобный случай. Когда код внутри потока генерирует определенные типы исключений и завершается, соответствующий future.get никогда не возвращается. Можно попробовать создать исключение java.lang.Error против java.lang.NoSuchMethodError. Соответствующее будущее последнего никогда не вернется.

person raggy    schedule 05.11.2014