Сегодня я нашел часть моего кода, написанного на Scala с помощью Akka, когда я следил за превосходной книгой Вона Вернона Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka.

Увидев, что я недавно начал играть с Akka Typed, я решил переписать код примера Pipes and Filters с помощью типов, чтобы посмотреть, есть ли в этом какие-то преимущества.

Шаблон описан в Главе 4: Каналы и фильтры в шаблонах реактивного обмена сообщениями с акторной моделью: приложения и интеграция в Scala и Akka, автор Вон Вернон. В целях экономии времени я не буду повторять то, что уже есть в книге. Я покажу только код, и понять смысл не составит труда. Вы можете найти полный исходный код для этой публикации в мой репозиторий.

Первое, что я сделал, это преобразовал каждого актора из класса в поведение. В некоторых случаях это может быть даже не функция, а просто val.

Начнем с трех фильтров. Все эти фильтры принимают в качестве входных данных сообщения одного и того же типа и пересылают возможно измененное сообщение, но того же типа, следующему фильтру. С обычной Akka у нас нет возможности обеспечить, чтобы типы были одинаковыми, и вы согласитесь со мной, что этот факт не сразу очевиден из приведенного ниже кода:

case class ProcessIncomingOrder(orderInfo: Array[Byte])
class Authenticator(nextFilter: ActorRef) extends Actor {
  def receive = {
    case message: ProcessIncomingOrder =>
      val text = new String(message.orderInfo)
      println(s"Authenticator authenticating: $text")
      val orderText = text.replace("(certificate)", "")
      nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))
  }
}
class Decrypter(nextFilter: ActorRef) extends Actor {
  def receive = {
    case message: ProcessIncomingOrder =>
      val text = new String(message.orderInfo)
      println(s"Decrypter decrypting: $text")
      val orderText = text.replace("(encryption)", "")
      nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))
  }
}
class Deduplicator(nextFilter: ActorRef) extends Actor {
  var ids:Set[String] = Set.empty

  def orderIdFrom(orderText: String): String = {
      val orderIdIndex = orderText.indexOf("id='") + 4
      val orderIdLastIndex = orderText.indexOf("'", orderIdIndex)
      orderText.substring(orderIdIndex, orderIdLastIndex)
  }

  def receive = {
    case message:ProcessIncomingOrder =>
      val text = new String(message.orderInfo)
      val orderId = orderIdFrom(text)
      if (ids.contains(orderId)) {
        println(s"Deduplicator excluding duplicate: $orderId")
      }
      else {
        ids = ids + orderId
        nextFilter ! message
      }
  }
}

Теперь давайте посмотрим на тот же код с типизированными акторами:

def authenticator(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] =
  Static {
    case message =>
      val text = new String(message.orderInfo)
      println(s"Authenticator authenticating: $text")
      val orderText = text.replace("(certificate)", "")
      nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))
  }

def decrypter(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] =
  Static {
    case message =>
      val text = new String(message.orderInfo)
      println(s"Decrypter decrypting $text")
      val orderText = text.replace("(encryption)", "")
      nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))
  }

def deduplicator(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] = {
  var ids: Set[String] = Set.empty
  def orderIdFrom(orderText: String): String = {
    val orderIdIndex = orderText.indexOf("id='") + 4
    val orderIdLastIndex = orderText.indexOf("'", orderIdIndex)
    orderText.substring(orderIdIndex, orderIdLastIndex)
  }

  Static {
    case message =>
      val text = new String(message.orderInfo)
      val orderId = orderIdFrom(text)
      if (ids.contains(orderId)) {
        println(s"Deduplicator excluding duplicate: $orderId")
      }
      else {
        ids = ids + orderId
        nextFilter ! message
      }
  }
}

Здесь мы видим, что каждое поведение принимает тот же тип сообщения, что и следующий актор. Контракт каждого фильтра одинаков!

Обратите также внимание на то, что нам больше не нужно писать:

case message: ProcessIncomingOrder =>

а можно просто сказать:

case message =>

это потому, что мы объявили наше поведение типа ProcessIncomingOrder, поэтому тип известен во время компиляции. Потрясающий!

Затем у нас есть два других актора: первый OrderAcceptanceEndpoint получает набор байтов и преобразует его в сообщение ProcessIncomingOrder, которое подходит для подачи в первый фильтр. Этот актор инициирует поток. В конце актор, который получает окончательное сообщение и выполняет над ним действие, называется OrderManagementSystem. Вот они оба:

class OrderAcceptanceEndpoint(nextFilter: ActorRef) extends Actor {
  def receive = {
    case rawOrder: Array[Byte] =>
      val text = new String(rawOrder)
      println(s"OrderAcceptanceEndpoint processing: $text.")

      nextFilter ! ProcessIncomingOrder(rawOrder)
  }
}
class OrderManagementSystem extends Actor {
  def receive = {
    case message:ProcessIncomingOrder =>
      val text = new String(message.orderInfo)

      println(s"OrderManagementSystem processing: $text")
  }
}

Вот те же два актера в Akka Typed:

def orderAcceptanceEndpoint(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[Array[Byte]] =
  Static {
    case rawOrder =>
      val text = new String(rawOrder)
      println(s"OrderAcceptanceEndpoint processing: $text.")

      nextFilter ! ProcessIncomingOrder(rawOrder)
  }
val orderManagementSystem: Behavior[ProcessIncomingOrder] = {
  Static {
    case message =>
      val text = new String(message.orderInfo)
      println(s"OrderManagementSystem processing: $text")
  }
}

Вы можете просмотреть полный исходный код обоих примеров здесь.

Теперь я немного солгал вам здесь, поскольку очевидно, что поведение не актеры. Чтобы создать настоящего актера, мы просим систему дать нам его. Я чувствую, что это различие очень важно, но мне еще предстоит узнать его истинную силу.

В любом случае, вот как мы будем создавать экземпляры акторов и собирать всю эту систему вместе с обычной Akka:

object PipesAndFiltersDriver extends App {
  val orderText = "(encryption)(certificate)<order id='123'>xbc</order>"
  val orderText2 = "(encryption)(certificate)<order id='234'>xbcd</order>"

  val rawOrderBytes = orderText.toCharArray.map(_.toByte)
  val rawOrderBytes2 = orderText2.toCharArray.map(_.toByte)

  val system = ActorSystem("system")
  val filter5 = system.actorOf(Props[OrderManagementSystem], "orderMgmntSystem")
  val filter4 = system.actorOf(Props(classOf[Deduplicator], filter5), "deduplicator")
  val filter3 = system.actorOf(Props(classOf[Authenticator], filter4), "authenticator")
  val filter2 = system.actorOf(Props(classOf[Decrypter], filter3), "decrypter")
  val filter1 = system.actorOf(Props(classOf[OrderAcceptanceEndpoint], filter2), "orderAccEndpoint")

  val input = filter1
  input ! rawOrderBytes
  input ! rawOrderBytes
  input ! rawOrderBytes2

  // ctrl-c to shutdown the system
}

С Typed Akka мы не можем просто создавать актеров без родителя. Таким образом, мы заключаем все наши создания актера в один хранитель, а затем по очереди создаем его экземпляр:

object PipesAndFiltersTypedDriver extends App {
  val pipesAndFiltersTypedDriver: Behavior[Array[Byte]] =
    ContextAware[Array[Byte]] {
      context =>
        val filter5 = context.spawn(Props(Filters.orderManagementSystem), "orderMgmntSystem")
        val filter4 = context.spawn(Props(Filters.deduplicator(filter5)), "deduplicator")
        val filter3 = context.spawn(Props(Filters.authenticator(filter4)), "authenticator")
        val filter2 = context.spawn(Props(Filters.decrypter(filter3)), "decrypter")
        val filter1 = context.spawn(Props(Filters.orderAcceptanceEndpoint(filter2)), "orderAccEndpoint")
      Static {
        case message => filter1 ! message
      }
    }

  val orderText = "(encryption)(certificate)<order id='123'>xbc</order>"
  val orderText2 = "(encryption)(certificate)<order id='234'>xbcd</order>"

  val rawOrderBytes = orderText.toCharArray.map(_.toByte)
  val rawOrderBytes2 = orderText2.toCharArray.map(_.toByte)

  val system: ActorSystem[Array[Byte]] = ActorSystem("system", Props(pipesAndFiltersTypedDriver))

  system ! rawOrderBytes
  system ! rawOrderBytes
  system ! rawOrderBytes2
  // ctrc-c to shutdown the system
}

Наблюдения

Переписывание кода с типизированными акторами дает сразу несколько преимуществ:

  1. Тут сразу меньше кода. Большую часть времени мне больше не нужно было писать полноценные классы.
  2. Упрощенный поток сообщений. Поскольку все варианты поведения типизированы, я мог сразу увидеть, каким образом сообщения могут входить и выходить.
  3. Приятно писать. Есть что-то в необходимости определять явные контракты между компонентами, а затем следить за их соблюдением во время компиляции. Это заставляет мой мозг покалывать.

Не хватает нескольких вещей, и это в основном касается инструментов:

  1. Инфраструктуры для тестирования пока нет. Здесь нет тестового комплекта, чтобы держать меня за руку.
  2. Отсутствие более подробных руководств. Хотя действительно кодовая база для Akka Typed довольно мала, и вы, вероятно, можете просмотреть большую ее часть, более подробное руководство было бы полезно. Кроме того, кто не любит читать об актерах?
  3. Нет руководства по идиоматическому способу ведения дел. Я уверен, что делаю вещи нетипизированным способом, и поведение может стать довольно сложным, но у меня пока нет уверенности, чтобы решить, каким путем идти. Больше примеров было бы удивительным ресурсом.

Вывод

Akka Typed — удивительное начинание. На мой взгляд, это более важно, чем работа над потоками, на которой сейчас сосредоточена Typesafe (компания, создавшая Akka). Особенно с такими новичками, как пони, кажется очевидным, что модель-актер здесь, чтобы остаться.

Это только вопрос времени, когда акторная модель станет стандартом, ее будут преподавать в университетах и ​​использовать в корпоративных системах. Одним из самых больших недостатков Akka всегда было отсутствие гарантий времени компиляции. С появлением Akka Typed эта гарантия появилась. Я подозреваю, что в ближайшие годы мы увидим быстрый рост числа сложных параллельных систем. Перейти на эту подножку сейчас! Зачем ждать? :-)