Регулировка акка-актора, сохраняющая только самые свежие сообщения

Ситуация

Я использую акторы akka для обновления данных в моем веб-клиенте. Один из этих участников несет полную ответственность за отправку обновлений, касающихся отдельных Agent. Эти агенты обновляются очень быстро (каждые 10 мс). Моя цель сейчас - ограничить этот механизм обновления, чтобы новейшая версия каждого Agent отправлялась каждые 300 мс.

Мой код

Вот что я до сих пор придумал:

/**
  * Single agents are updated very rapidly. To limit the burden on the web-frontend, we throttle the messages here.
  */
class BroadcastSingleAgentActor extends Actor {

    private implicit val ec: ExecutionContextExecutor = context.dispatcher
    private var queue = Set[Agent]()

    context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
        queue.foreach { a =>
            broadcastAgent(self)(a) // sends the message to all connected clients
        }
        queue = Set()
    }

    override def receive: Receive = {
        // this message is received every 10 ms for every agent present
        case BroadcastAgent(agent) => 
           // only keep the newest version of the agent
           queue = queue.filter(_.id != agent.id) + agent
    }

}

Вопрос

Этот субъект (BroadcastSingleAgentActor) работает так, как ожидалось, но я не уверен на 100%, является ли он потокобезопасным (обновление queue при его возможной очистке). Кроме того, мне не кажется, что я максимально использую инструменты, которые предоставляет мне Akka. Я нашел эту статью (Регулирование сообщений в Akka 2), но моя проблема в том, что мне нужно сохранить самое новое Agent сообщение, удаляя любую его старую версию. Есть ли где-нибудь пример, похожий на то, что мне нужно?


person Florian Baierl    schedule 26.03.2019    source источник


Ответы (1)


Нет, это небезопасно для потоков, потому что планирование через ActorSystem будет происходить в другом потоке, чем receive. Одна из возможных идей состоит в том, чтобы выполнять планирование в receive методе, потому что входящие в BroadcastSingleAgentActor сообщения будут обрабатываться последовательно.

  override def receive: Receive = {

    case Refresh =>
      context.system.scheduler.schedule(0 seconds, 300 milliseconds) {
        queue.foreach { a =>
          broadcastAgent(self)(a) // sends the message to all connected clients
        }
      }
      queue = Set()
    // this message is received every 10 ms for every agent present
    case BroadcastAgent(agent) =>
      // only keep the newest version of the agent
      queue = queue.filter(_.id != agent.id) + agent
  }
person Dylan Grald    schedule 26.03.2019