Материализация графа внутри актора

Я пытаюсь материализовать график внутри актера. Кажется, это работает, если верно одно из следующих условий:

  1. График не содержит трансляции (созданной с помощью alsoTo) или
  2. Один и тот же ActorMaterializer используется для каждой материализации или
  3. Граф материализуется за пределами Actor

Я сократил его до следующих тестов:

import java.util.concurrent.{CountDownLatch, TimeUnit}

import akka.NotUsed
import akka.actor.{Actor, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{RunnableGraph, Sink, Source}
import akka.testkit.{TestActorRef, TestKit}

import org.scalatest.{FlatSpecLike, Matchers}

class ActorFlowTest extends TestKit(ActorSystem("ActorFlowTest")) with Matchers with FlatSpecLike {

  def createGraph(withBroadcast: Boolean) = {
    if (withBroadcast) Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
    else Source.empty.to(Sink.ignore)
  }

  case object Bomb

  class FlowActor(
    graph: RunnableGraph[NotUsed],
    latch: CountDownLatch,
    materializer: (ActorSystem) => ActorMaterializer
  ) extends Actor {

    override def preStart(): Unit = {
      graph.run()(materializer(context.system))
      latch.countDown()
    }

    override def receive: Receive = {
      case Bomb => throw new RuntimeException
    }
  }

  "Without an actor" should "be able to materialize twice" in {
    val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
    val materializer1 = ActorMaterializer()(system)
    val materializer2 = ActorMaterializer()(system)
    graph.run()(materializer1)
    graph.run()(materializer2) // Pass
  }

  "With a the same materializer" should "be able to materialize twice" in {
    val graph = createGraph(withBroadcast = true)
    val latch = new CountDownLatch(2)
    val materializer = ActorMaterializer()(system)
    val actorRef = TestActorRef(new FlowActor(graph, latch, _ => materializer))
    verify(actorRef, latch) should be(true) // Pass
  }

  "With a new materializer but no broadcast" should "be able to materialize twice" in {
    val graph = createGraph(withBroadcast = false)
    val latch = new CountDownLatch(2)
    def materializer(system: ActorSystem) = ActorMaterializer()(system)
    val actorRef = TestActorRef(new FlowActor(graph, latch, materializer))
    verify(actorRef, latch) should be(true) // Pass
  }

  "With a new materializer and a broadcast" should "be able to materialize twice" in {
    val graph = createGraph(withBroadcast = true)
    val latch = new CountDownLatch(2)
    def materializer(system: ActorSystem) = ActorMaterializer()(system)
    val actorRef = TestActorRef(new FlowActor(graph, latch, materializer))
    verify(actorRef, latch) should be(true) // Fail
  }

  def verify(actorRef: TestActorRef[_], latch: CountDownLatch): Boolean = {
    actorRef.start()
    actorRef ! Bomb
    latch.await(25, TimeUnit.SECONDS)
  }
}

Кажется, что последние случаи всегда будут истекать по тайм-ауту со следующей ошибкой в ​​журнале:

[ERROR] [07/05/2016 16:06:30.625] [ActorFlowTest-akka.actor.default-dispatcher-6] [akka://ActorFlowTest/user/$$c] Futures timed out after [20000 milliseconds]
akka.actor.PostRestartException: akka://ActorFlowTest/user/$$c: exception post restart (class java.lang.RuntimeException)
    at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:250)
    at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:248)
    at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:303)
    at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:298)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:248)
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76)
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:374)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:464)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.testkit.CallingThreadDispatcher.process$1(CallingThreadDispatcher.scala:243)
    at akka.testkit.CallingThreadDispatcher.runQueue(CallingThreadDispatcher.scala:283)
    at akka.testkit.CallingThreadDispatcher.systemDispatch(CallingThreadDispatcher.scala:191)
    at akka.actor.dungeon.Dispatch$class.restart(Dispatch.scala:119)
    at akka.actor.ActorCell.restart(ActorCell.scala:374)
    at akka.actor.LocalActorRef.restart(ActorRef.scala:406)
    at akka.actor.SupervisorStrategy.restartChild(FaultHandling.scala:365)
    at akka.actor.OneForOneStrategy.processFailure(FaultHandling.scala:518)
    at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:303)
    at akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:263)
    at akka.actor.ActorCell.handleFailure(ActorCell.scala:374)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:167)
    at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:165)
    at scala.concurrent.Await$.result(package.scala:190)
    at akka.stream.impl.ActorMaterializerImpl.actorOf(ActorMaterializerImpl.scala:207)
    at akka.stream.impl.ActorMaterializerImpl$$anon$2.matGraph(ActorMaterializerImpl.scala:166)
    at akka.stream.impl.ActorMaterializerImpl$$anon$2.materializeAtomic(ActorMaterializerImpl.scala:150)
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:919)
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
    at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915)
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:922)
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915)
    at scala.collection.immutable.Set$Set4.foreach(Set.scala:200)
    at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915)
    at akka.stream.impl.MaterializerSession.materialize(StreamLayout.scala:882)
    at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:182)
    at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:80)
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:351)
    at ActorFlowTest$FlowActor.preStart(ActorFlowTest.scala:40)
    at akka.actor.Actor$class.postRestart(Actor.scala:566)
    at ActorFlowTest$FlowActor.postRestart(ActorFlowTest.scala:33)
    at akka.actor.Actor$class.aroundPostRestart(Actor.scala:504)
    at ActorFlowTest$FlowActor.aroundPostRestart(ActorFlowTest.scala:33)
    at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:239)
    ... 25 more

Я попытался явно завершить ActorMaterializers, но это не воспроизводит проблему.

Обходной путь состоит в том, чтобы создать замыкание вокруг ActorMaterializer в Props, но если это также произошло из другого Actor, я беспокоюсь, что в конечном итоге у меня возникнут аналогичные проблемы.

Любая идея, почему это? Очевидно, это как-то связано с ActorMaterializer, но интересно, как удаление широковещания также решает эту проблему (даже с гораздо более сложным графиком).


person steinybot    schedule 05.07.2016    source источник
comment
Можете ли вы попробовать переопределить postRestart и проверить, что там происходит (проверить входящее исключение и посмотреть, достигает ли его код)? Также не забудьте вызвать preStart при переопределении postRestart.   -  person thwiegan    schedule 05.07.2016
comment
RuntimeException, брошенный в receive, является reason в postRestart. TimeoutException выбрасывается, когда postRestart вызывает preStart и когда тот вызывает graph.run.   -  person steinybot    schedule 05.07.2016


Ответы (2)


Кажется, это связано с надзором (или, по крайней мере, решается с помощью надлежащего надзора). Я создал дополнительный Supervisor-Actor, который для демонстрационных целей просто запускает один FlowActor в своей preStart функции и пересылает ему сообщения Bomb. Следующие тесты выполняются успешно без каких-либо исключений тайм-аута:

import java.util.concurrent.{CountDownLatch, TimeUnit}

import akka.NotUsed
import akka.actor.Actor.Receive
import akka.actor.SupervisorStrategy._
import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{RunnableGraph, Sink, Source}
import akka.testkit.{TestActorRef, TestKit}
import org.scalatest.{FlatSpecLike, Matchers}

import scala.concurrent.duration._

class ActorFlowTest extends TestKit(ActorSystem("TrikloSystem")) with Matchers with FlatSpecLike {

  def createGraph(withBroadcast: Boolean) = {
    if (withBroadcast) Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
    else Source.empty.to(Sink.ignore)
  }

  case object Bomb

  class Supervisor( graph: RunnableGraph[NotUsed],
                    latch: CountDownLatch,
                    materializer: (ActorSystem) => ActorMaterializer) extends Actor {

    var actorRef: Option[ActorRef] = None

    override def preStart(): Unit = {
      actorRef = Some(context.actorOf(Props( new FlowActor(graph, latch, materializer))))
    }

    override def receive: Receive = {
      case Bomb => actorRef.map( _ ! Bomb )
    }
  }

  class FlowActor(
                   graph: RunnableGraph[NotUsed],
                   latch: CountDownLatch,
                   materializer: (ActorSystem) => ActorMaterializer
                 ) extends Actor {

    override def preStart(): Unit = {
      graph.run()(materializer(context.system))
      latch.countDown()
    }

    override def receive: Receive = {
      case Bomb =>
        throw new RuntimeException
    }
  }

  "Without an actor" should "be able to materialize twice" in {
    val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
    val materializer1 = ActorMaterializer()(system)
    val materializer2 = ActorMaterializer()(system)
    graph.run()(materializer1)
    graph.run()(materializer2) // Pass
  }

  "With a the same materializer" should "be able to materialize twice" in {
    val graph = createGraph(withBroadcast = true)
    val latch = new CountDownLatch(2)
    val materializer = ActorMaterializer()(system)
    val actorRef = TestActorRef(new Supervisor(graph, latch, _ => materializer))
    verify(actorRef, latch) should be(true) // Pass
  }

  "With a new materializer but no broadcast" should "be able to materialize twice" in {
    val graph = createGraph(withBroadcast = false)
    val latch = new CountDownLatch(2)
    def materializer(system: ActorSystem) = ActorMaterializer()(system)
    val actorRef = TestActorRef(new Supervisor(graph, latch, materializer))
    verify(actorRef, latch) should be(true) // Pass
  }

  "With a new materializer and a broadcast" should "be able to materialize twice" in {
    val graph = createGraph(withBroadcast = true)
    val latch = new CountDownLatch(2)
    def materializer(system: ActorSystem) = ActorMaterializer()(system)
    val actorRef = TestActorRef(new Supervisor(graph, latch, materializer))
    verify(actorRef, latch) should be(true) // Fail
  }

  def verify(actorRef: TestActorRef[_], latch: CountDownLatch): Boolean = {
    actorRef.start()
    actorRef ! Bomb
    latch.await(25, TimeUnit.SECONDS)
  }
}
person thwiegan    schedule 05.07.2016
comment
Это полезно, но кажется, что это только добавляет к списку предостережений. Почему актер-хранитель пользователя должен отличаться от супервизора здесь? Другими словами, как это (или любое другое предостережение) действительно меняет поведение? - person steinybot; 11.07.2016

В этом тесте есть некоторые случаи неправильного использования Akka TestKit.

TestActorRef — это очень особенная тестовая конструкция, поскольку она будет выполняться в вызывающем потоке (CallingThreadDispatcher), чтобы обеспечить простое синхронное модульное тестирование. Использование CountDownLatch в синхронном тесте странно, поскольку любое действие выполняется в одном и том же потоке, поэтому нет необходимости в обмене данными между потоками.

Когда вы создаете экземпляр TestActorRef, он запускается в том же вызове (вы можете увидеть это, например, выбрасывая исключение из конструктора или preStart и увидев, что он попадает в ваш тестовый пример).

Вызов start для ActorRef определенно не то, что вам следует делать, особая природа TestActorRef дает вам доступ к нему, но вы, по сути, вызываете start для пустого актера оболочки, а не для актера, с которым, по вашему мнению, вы взаимодействуете (и если это было так). актер, было бы неправильно когда-либо вызывать start() на нем).

Надлежащий (но не очень полезный, так как нет проблем с материализацией графа дважды, независимо от контекста или материализатора) тест того, что вы собираетесь повторить, будет без защелки и будет выглядеть примерно так:

class FlowActor(graph: RunnableGraph[NotUsed], materializer: (ActorSystem) => ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    graph.run()(materializer(context.system))
  }
  override def receive: Receive = Actor.emptyBehavior
}

"With a new materializer and a broadcast" should "be able to materialize twice" in {
  val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
  def materializer(system: ActorSystem) = ActorMaterializer()(system)
  val actorRef1 = TestActorRef(new FlowActor(graph, materializer))
  val actorRef2 = TestActorRef(new FlowActor(graph, materializer))
  // we'd get an exception here if it was not possible to materialize
  // since pre-start is run on the calling thread - the same thread
  // that is executing the test case
}

Я бы просто оставил конкретные странности этого, вместо того, чтобы копаться глубже в магии в TestActorRef, это будут с трудом заработанные идеи, и они не будут применимы во многих случаях, кроме этого конкретного.

person johanandren    schedule 12.07.2016