Akka-Streams ActorPublisher не получает сообщений запроса

Я пытаюсь постоянно читать IRC-канал википедии, используя эту библиотеку: https://github.com/implydata/wikiticker

Я создал собственный Akka Publisher, который будет использоваться в моей системе как Source.

Вот некоторые из моих классов:

class IrcPublisher() extends ActorPublisher[String] {
  import scala.collection._

  var queue: mutable.Queue[String] = mutable.Queue()

  override def receive: Actor.Receive = {
    case Publish(s) =>
      println(s"->MSG, isActive = $isActive, totalDemand = $totalDemand")
      queue.enqueue(s)
      publishIfNeeded()

    case Request(cnt) =>
      println("Request: " + cnt)
      publishIfNeeded()

    case Cancel =>
      println("Cancel")
      context.stop(self)

    case _ =>
      println("Hm...")
  }

  def publishIfNeeded(): Unit = {
    while (queue.nonEmpty && isActive && totalDemand > 0) {
      println("onNext")
      onNext(queue.dequeue())
    }
  }
 }

object IrcPublisher {
  case class Publish(data: String)
}

Я создаю все эти объекты так:

  def createSource(wikipedias: Seq[String]) {
      val dataPublisherRef = system.actorOf(Props[IrcPublisher])
      val dataPublisher = ActorPublisher[String](dataPublisherRef)
      val listener = new MessageListener {
        override def process(message: Message) = {
          dataPublisherRef ! Publish(Jackson.generate(message.toMap))
        }
      }

      val ticker = new IrcTicker(
        "irc.wikimedia.org",
        "imply",
        wikipedias map (x => s"#$x.wikipedia"),
        Seq(listener)
      )

      ticker.start() // if I comment this...
      Thread.currentThread().join() //... and this I get Request(...)

      Source.fromPublisher(dataPublisher)
}

Итак, проблема, с которой я столкнулся, - это объект Source. Хотя эта реализация хорошо работает с другими источниками (например, из локального файла), ActorPublisher не получает сообщения Request().

Если я прокомментирую две отмеченные строки, я увижу, что мой актор получил сообщение Request(count) из моего потока. В противном случае все сообщения будут помещаться в очередь, но не в мой поток (поэтому я могу видеть напечатанные сообщения MSG).

Я думаю, что здесь что-то с многопоточностью/синхронизацией.


person codejitsu    schedule 31.01.2016    source источник


Ответы (2)


Я недостаточно знаком с wikiticker, чтобы решить вашу проблему как должное. У меня был бы один вопрос: зачем нужно присоединяться к текущему потоку?

Однако я думаю, что вы слишком усложнили использование Source. Вам было бы проще работать с потоком в целом, чем создавать ActorPublisher собственный.

Вы можете использовать Source.actorRef для материализации потока в ActorRef и работать с этим ActorRef. Это позволяет вам использовать код akka для постановки/удаления очереди в буфер, в то время как вы можете сосредоточиться на «бизнес-логике».

Скажем, например, весь ваш поток предназначен только для фильтрации строк выше определенной длины и вывода их на консоль. Это может быть достигнуто с помощью:

def dispatchIRCMessages(actorRef : ActorRef) = {
  val ticker = 
     new IrcTicker("irc.wikimedia.org",
                   "imply",
                   wikipedias map (x => s"#$x.wikipedia"),
                   Seq(new MessageListener {
                         override def process(message: Message) = 
                          actorRef ! Publish(Jackson.generate(message.toMap))
                       }))

  ticker.start()
  Thread.currentThread().join()
}


//these variables control the buffer behavior
val bufferSize = 1024
val overFlowStrategy = akka.stream.OverflowStrategy.dropHead

val minMessageSize = 32

//no need for a custom Publisher/Queue
val streamRef = 
  Source.actorRef[String](bufferSize, overFlowStrategy)
        .via(Flow[String].filter(_.size > minMessageSize))
        .to(Sink.foreach[String](println))
        .run()

dispatchIRCMessages(streamRef)

Дополнительным преимуществом dispatchIRCMessages является то, что он будет работать с любым ActorRef, поэтому вам не нужно работать только с потоками/издателями.

Надеюсь, это решит вашу основную проблему...

person Ramón J Romero y Vigil    schedule 31.01.2016
comment
Мне нужно подождать в потоке тикера (вызов соединения), потому что тикеру нужно некоторое время для правильной инициализации. В моих экспериментах, если я не присоединяюсь к потокам, я получаю некоторые исключения ввода-вывода (InterruptedException) из тикера. Итак, я думаю, это потому, что некоторые фоновые потоки работают с Akka. - person codejitsu; 01.02.2016

Я думаю, что основная проблема Thread.currentThread().join(). Эта строка «повесит» текущий поток, потому что этот поток ждет своей смерти. Прочтите https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join-long- .

person grzesiekw    schedule 31.01.2016