Я создал поток akka, в который была передана функция процесса и функция обработчика ошибок. Source
и Sink
полностью инкапсулированы в ClosedShape
RunnableFlow
. Мое намерение состоит в том, чтобы передать элемент родительскому классу и запустить его в потоке. Все это работает, пока я не приступаю к тестированию. Я использую scala-test и передаю добавление к спискам внутри функции процесса и функции обработчика ошибок. Я случайным образом генерирую ошибки, чтобы увидеть, как что-то перетекает в функцию обработчика ошибок. Проблема в том, что если я передам 100 элементов родительскому классу, то я ожидаю, что список элементов в функции ошибок и список элементов в функции процесса будут в сумме равны 100. Поскольку источник и приемник полностью инкапсулированы, я этого не делаю. У нас нет четкого способа сказать тесту ждать и перейти к операторам assertion/should до того, как все элементы будут обработаны в потоке. Я создал эту суть для описания потока.
Вот пример теста для приведенной выше сути:
import akka.actor._
import akka.stream._
import akka.testkit._
import org.scalatest._
class TestSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TestSpec"))
override def afterAll = {
Thread.sleep(500)
mat.shutdown()
TestKit.shutdownActorSystem(system)
}
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true))
"TestSpec" must {
"handle messages" in {
val testStream = new Testing() // For Testing class see gist: https://gist.github.com/leftofnull/3e4c2a6b18fe71d219b6
(1 to 100).map(n => testStream.processString(s"${n}${n * 2}${n * 4}${n * 8}")) // Give it 100 strings to chew on
testStream.errors.size should not be (0) // passes
testStream.processed.size should not be (0) // passes
(testStream.processed.size + testStream.errors.size) should be (100) // fails due to checking before all items are processed
}
}
}