Я пытаюсь постоянно читать IRC-канал википедии, используя эту библиотеку: https://github.com/implydata/wikiticker а>
Я создал собственный Akka Publisher, который будет использоваться в моей системе как Source
.
Вот некоторые из моих классов:
class IrcPublisher() extends ActorPublisher[String] {
import scala.collection._
var queue: mutable.Queue[String] = mutable.Queue()
override def receive: Actor.Receive = {
case Publish(s) =>
println(s"->MSG, isActive = $isActive, totalDemand = $totalDemand")
queue.enqueue(s)
publishIfNeeded()
case Request(cnt) =>
println("Request: " + cnt)
publishIfNeeded()
case Cancel =>
println("Cancel")
context.stop(self)
case _ =>
println("Hm...")
}
def publishIfNeeded(): Unit = {
while (queue.nonEmpty && isActive && totalDemand > 0) {
println("onNext")
onNext(queue.dequeue())
}
}
}
object IrcPublisher {
case class Publish(data: String)
}
Я создаю все эти объекты так:
def createSource(wikipedias: Seq[String]) {
val dataPublisherRef = system.actorOf(Props[IrcPublisher])
val dataPublisher = ActorPublisher[String](dataPublisherRef)
val listener = new MessageListener {
override def process(message: Message) = {
dataPublisherRef ! Publish(Jackson.generate(message.toMap))
}
}
val ticker = new IrcTicker(
"irc.wikimedia.org",
"imply",
wikipedias map (x => s"#$x.wikipedia"),
Seq(listener)
)
ticker.start() // if I comment this...
Thread.currentThread().join() //... and this I get Request(...)
Source.fromPublisher(dataPublisher)
}
Итак, проблема, с которой я столкнулся, - это объект Source
. Хотя эта реализация хорошо работает с другими источниками (например, из локального файла), ActorPublisher не получает сообщения Request()
.
Если я прокомментирую две отмеченные строки, я увижу, что мой актор получил сообщение Request(count) из моего потока. В противном случае все сообщения будут помещаться в очередь, но не в мой поток (поэтому я могу видеть напечатанные сообщения MSG).
Я думаю, что здесь что-то с многопоточностью/синхронизацией.