Как протестировать исполняемый граф закрытой формы потока akka с инкапсулированным источником и приемником

Я создал поток akka, в который была передана функция процесса и функция обработчика ошибок. Source и Sink полностью инкапсулированы в ClosedShape RunnableFlow. Мое намерение состоит в том, чтобы передать элемент родительскому классу и запустить его в потоке. Все это работает, пока я не приступаю к тестированию. Я использую scala-test и передаю добавление к спискам внутри функции процесса и функции обработчика ошибок. Я случайным образом генерирую ошибки, чтобы увидеть, как что-то перетекает в функцию обработчика ошибок. Проблема в том, что если я передам 100 элементов родительскому классу, то я ожидаю, что список элементов в функции ошибок и список элементов в функции процесса будут в сумме равны 100. Поскольку источник и приемник полностью инкапсулированы, я этого не делаю. У нас нет четкого способа сказать тесту ждать и перейти к операторам assertion/should до того, как все элементы будут обработаны в потоке. Я создал эту суть для описания потока.

Вот пример теста для приведенной выше сути:

import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._

class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
    with WordSpecLike with Matchers with BeforeAndAfterAll {
  def this() = this(ActorSystem("TestSpec"))

  override def afterAll = {
    Thread.sleep(500)
    mat.shutdown()
    TestKit.shutdownActorSystem(system)
  }

  implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))

  "TestSpec" must {
    "handle messages" in {
      val testStream = new Testing()                                                 // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
      (1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on

      testStream.errors.size should not be (0)                                       // passes
      testStream.processed.size should not be (0)                                    // passes
      (testStream.processed.size + testStream.errors.size) should be (100)           // fails due to checking before all items are processed
    }
  }
}

person Alexander Kahoun    schedule 19.02.2016    source источник


Ответы (1)


Согласно комментарию Виктора Кланга к связанному Gist. Это оказывается отличным решением:

def consume(
    errorHandler: BadData => Unit, fn: Data => Unit, a: String
  ): RunnableGraph[Future[akka.Done]] = RunnableGraph.fromGraph(
    GraphDSL.create(Sink.foreach[BadData](errorHandler)) { implicit b: GraphDSL.Builder[Unit] => sink =>
      import GraphDSL.Implicits._

      val source = b.add(Source.single(a))
      val broadcast = b.add(Broadcast[String](2))
      val merge = b.add(Zip[String, String])
      val process = new ProcessorFlow(fn)
      val failed = b.add(Flow[Xor[BadData, Data]].filter(x => x.isLeft))
      val errors = b.add(new LeftFlow[Xor[BadData, Data], BadData](
        (input: Xor[BadData, Data]) =>
          input.swap.getOrElse((new Throwable, ("", "")))
      ))

      source ~> broadcast.in
                broadcast.out(0) ~> Flow[String].map(_.reverse)       ~> merge.in0
                broadcast.out(1) ~> Flow[String].map("| " + _ + " |") ~> merge.in1
                                                                         merge.out ~> process ~> failed ~> errors ~> sink

      ClosedShape
    }
  )

Это позволяет мне Await.result работать с RunnableGraph в целях тестирования. Еще раз спасибо Виктору за это решение!

person Alexander Kahoun    schedule 22.02.2016
comment
Хорошее решение. Пока это работает, я не понимаю, почему. Не могли бы вы объяснить, как предоставление стока в качестве аргумента превращает тип Graph Mat из NotUsed в Future[Done]? IIUC, Sink должно быть из g1: Graph[Shape, Mat] из GraphApply#create, или я что-то здесь неправильно понимаю? - person kostja; 02.09.2016
comment
Подтверждено, это работает, приемник, переданный в качестве аргумента GraphDSL.create, меняет тип возвращаемого значения и позволяет дождаться завершения графа. Это похоже на взлом, и я очень удивлен, что нет более интуитивно понятного решения для этого. Кто-нибудь знает о другом способе сделать это? - person Nicolas Delaforge; 28.11.2016
comment
Это работает, потому что Sink.foreach делает мат за вас. игнорировать)(сохранять.право).named(foreachSink) - person Jim; 21.04.2020