Ситуация
Я использую акторы akka для обновления данных в моем веб-клиенте. Один из этих участников несет полную ответственность за отправку обновлений, касающихся отдельных Agent
. Эти агенты обновляются очень быстро (каждые 10 мс). Моя цель сейчас - ограничить этот механизм обновления, чтобы новейшая версия каждого Agent
отправлялась каждые 300 мс.
Мой код
Вот что я до сих пор придумал:
/**
* Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
*/
class BroadcastSingleAgentActor extends Actor {
private implicit val ec: ExecutionContextExecutor = context.dispatcher
private var queue = Set[Agent]()
context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
queue.foreach { a =>
broadcastAgent(self)(a) // sends the message to all connected clients
}
queue = Set()
}
override def receive: Receive = {
// this message is received every 10 ms for every agent present
case BroadcastAgent(agent) =>
// only keep the newest version of the agent
queue = queue.filter(_.id != agent.id) + agent
}
}
Вопрос
Этот субъект (BroadcastSingleAgentActor
) работает так, как ожидалось, но я не уверен на 100%, является ли он потокобезопасным (обновление queue
при его возможной очистке). Кроме того, мне не кажется, что я максимально использую инструменты, которые предоставляет мне Akka. Я нашел эту статью (Регулирование сообщений в Akka 2), но моя проблема в том, что мне нужно сохранить самое новое Agent
сообщение, удаляя любую его старую версию. Есть ли где-нибудь пример, похожий на то, что мне нужно?