Сегодня я нашел часть моего кода, написанного на Scala с помощью Akka, когда я следил за превосходной книгой Вона Вернона Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka.
Увидев, что я недавно начал играть с Akka Typed, я решил переписать код примера Pipes and Filters с помощью типов, чтобы посмотреть, есть ли в этом какие-то преимущества.
Шаблон описан в Главе 4: Каналы и фильтры в шаблонах реактивного обмена сообщениями с акторной моделью: приложения и интеграция в Scala и Akka, автор Вон Вернон. В целях экономии времени я не буду повторять то, что уже есть в книге. Я покажу только код, и понять смысл не составит труда. Вы можете найти полный исходный код для этой публикации в мой репозиторий.
Первое, что я сделал, это преобразовал каждого актора из класса в поведение. В некоторых случаях это может быть даже не функция, а просто val.
Начнем с трех фильтров. Все эти фильтры принимают в качестве входных данных сообщения одного и того же типа и пересылают возможно измененное сообщение, но того же типа, следующему фильтру. С обычной Akka у нас нет возможности обеспечить, чтобы типы были одинаковыми, и вы согласитесь со мной, что этот факт не сразу очевиден из приведенного ниже кода:
case class ProcessIncomingOrder(orderInfo: Array[Byte]) class Authenticator(nextFilter: ActorRef) extends Actor { def receive = { case message: ProcessIncomingOrder => val text = new String(message.orderInfo) println(s"Authenticator authenticating: $text") val orderText = text.replace("(certificate)", "") nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte)) } } class Decrypter(nextFilter: ActorRef) extends Actor { def receive = { case message: ProcessIncomingOrder => val text = new String(message.orderInfo) println(s"Decrypter decrypting: $text") val orderText = text.replace("(encryption)", "") nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte)) } } class Deduplicator(nextFilter: ActorRef) extends Actor { var ids:Set[String] = Set.empty def orderIdFrom(orderText: String): String = { val orderIdIndex = orderText.indexOf("id='") + 4 val orderIdLastIndex = orderText.indexOf("'", orderIdIndex) orderText.substring(orderIdIndex, orderIdLastIndex) } def receive = { case message:ProcessIncomingOrder => val text = new String(message.orderInfo) val orderId = orderIdFrom(text) if (ids.contains(orderId)) { println(s"Deduplicator excluding duplicate: $orderId") } else { ids = ids + orderId nextFilter ! message } } }
Теперь давайте посмотрим на тот же код с типизированными акторами:
def authenticator(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] = Static { case message => val text = new String(message.orderInfo) println(s"Authenticator authenticating: $text") val orderText = text.replace("(certificate)", "") nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte)) } def decrypter(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] = Static { case message => val text = new String(message.orderInfo) println(s"Decrypter decrypting $text") val orderText = text.replace("(encryption)", "") nextFilter ! ProcessIncomingOrder(orderText.toCharArray.map(_.toByte)) } def deduplicator(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[ProcessIncomingOrder] = { var ids: Set[String] = Set.empty def orderIdFrom(orderText: String): String = { val orderIdIndex = orderText.indexOf("id='") + 4 val orderIdLastIndex = orderText.indexOf("'", orderIdIndex) orderText.substring(orderIdIndex, orderIdLastIndex) } Static { case message => val text = new String(message.orderInfo) val orderId = orderIdFrom(text) if (ids.contains(orderId)) { println(s"Deduplicator excluding duplicate: $orderId") } else { ids = ids + orderId nextFilter ! message } } }
Здесь мы видим, что каждое поведение принимает тот же тип сообщения, что и следующий актор. Контракт каждого фильтра одинаков!
Обратите также внимание на то, что нам больше не нужно писать:
case message: ProcessIncomingOrder =>
а можно просто сказать:
case message =>
это потому, что мы объявили наше поведение типа ProcessIncomingOrder, поэтому тип известен во время компиляции. Потрясающий!
Затем у нас есть два других актора: первый OrderAcceptanceEndpoint получает набор байтов и преобразует его в сообщение ProcessIncomingOrder, которое подходит для подачи в первый фильтр. Этот актор инициирует поток. В конце актор, который получает окончательное сообщение и выполняет над ним действие, называется OrderManagementSystem. Вот они оба:
class OrderAcceptanceEndpoint(nextFilter: ActorRef) extends Actor { def receive = { case rawOrder: Array[Byte] => val text = new String(rawOrder) println(s"OrderAcceptanceEndpoint processing: $text.") nextFilter ! ProcessIncomingOrder(rawOrder) } } class OrderManagementSystem extends Actor { def receive = { case message:ProcessIncomingOrder => val text = new String(message.orderInfo) println(s"OrderManagementSystem processing: $text") } }
Вот те же два актера в Akka Typed:
def orderAcceptanceEndpoint(nextFilter: ActorRef[ProcessIncomingOrder]): Behavior[Array[Byte]] = Static { case rawOrder => val text = new String(rawOrder) println(s"OrderAcceptanceEndpoint processing: $text.") nextFilter ! ProcessIncomingOrder(rawOrder) } val orderManagementSystem: Behavior[ProcessIncomingOrder] = { Static { case message => val text = new String(message.orderInfo) println(s"OrderManagementSystem processing: $text") } }
Вы можете просмотреть полный исходный код обоих примеров здесь.
Теперь я немного солгал вам здесь, поскольку очевидно, что поведение не актеры. Чтобы создать настоящего актера, мы просим систему дать нам его. Я чувствую, что это различие очень важно, но мне еще предстоит узнать его истинную силу.
В любом случае, вот как мы будем создавать экземпляры акторов и собирать всю эту систему вместе с обычной Akka:
object PipesAndFiltersDriver extends App { val orderText = "(encryption)(certificate)<order id='123'>xbc</order>" val orderText2 = "(encryption)(certificate)<order id='234'>xbcd</order>" val rawOrderBytes = orderText.toCharArray.map(_.toByte) val rawOrderBytes2 = orderText2.toCharArray.map(_.toByte) val system = ActorSystem("system") val filter5 = system.actorOf(Props[OrderManagementSystem], "orderMgmntSystem") val filter4 = system.actorOf(Props(classOf[Deduplicator], filter5), "deduplicator") val filter3 = system.actorOf(Props(classOf[Authenticator], filter4), "authenticator") val filter2 = system.actorOf(Props(classOf[Decrypter], filter3), "decrypter") val filter1 = system.actorOf(Props(classOf[OrderAcceptanceEndpoint], filter2), "orderAccEndpoint") val input = filter1 input ! rawOrderBytes input ! rawOrderBytes input ! rawOrderBytes2 // ctrl-c to shutdown the system }
С Typed Akka мы не можем просто создавать актеров без родителя. Таким образом, мы заключаем все наши создания актера в один хранитель, а затем по очереди создаем его экземпляр:
object PipesAndFiltersTypedDriver extends App { val pipesAndFiltersTypedDriver: Behavior[Array[Byte]] = ContextAware[Array[Byte]] { context => val filter5 = context.spawn(Props(Filters.orderManagementSystem), "orderMgmntSystem") val filter4 = context.spawn(Props(Filters.deduplicator(filter5)), "deduplicator") val filter3 = context.spawn(Props(Filters.authenticator(filter4)), "authenticator") val filter2 = context.spawn(Props(Filters.decrypter(filter3)), "decrypter") val filter1 = context.spawn(Props(Filters.orderAcceptanceEndpoint(filter2)), "orderAccEndpoint") Static { case message => filter1 ! message } } val orderText = "(encryption)(certificate)<order id='123'>xbc</order>" val orderText2 = "(encryption)(certificate)<order id='234'>xbcd</order>" val rawOrderBytes = orderText.toCharArray.map(_.toByte) val rawOrderBytes2 = orderText2.toCharArray.map(_.toByte) val system: ActorSystem[Array[Byte]] = ActorSystem("system", Props(pipesAndFiltersTypedDriver)) system ! rawOrderBytes system ! rawOrderBytes system ! rawOrderBytes2 // ctrc-c to shutdown the system }
Наблюдения
Переписывание кода с типизированными акторами дает сразу несколько преимуществ:
- Тут сразу меньше кода. Большую часть времени мне больше не нужно было писать полноценные классы.
- Упрощенный поток сообщений. Поскольку все варианты поведения типизированы, я мог сразу увидеть, каким образом сообщения могут входить и выходить.
- Приятно писать. Есть что-то в необходимости определять явные контракты между компонентами, а затем следить за их соблюдением во время компиляции. Это заставляет мой мозг покалывать.
Не хватает нескольких вещей, и это в основном касается инструментов:
- Инфраструктуры для тестирования пока нет. Здесь нет тестового комплекта, чтобы держать меня за руку.
- Отсутствие более подробных руководств. Хотя действительно кодовая база для Akka Typed довольно мала, и вы, вероятно, можете просмотреть большую ее часть, более подробное руководство было бы полезно. Кроме того, кто не любит читать об актерах?
- Нет руководства по идиоматическому способу ведения дел. Я уверен, что делаю вещи нетипизированным способом, и поведение может стать довольно сложным, но у меня пока нет уверенности, чтобы решить, каким путем идти. Больше примеров было бы удивительным ресурсом.
Вывод
Akka Typed — удивительное начинание. На мой взгляд, это более важно, чем работа над потоками, на которой сейчас сосредоточена Typesafe (компания, создавшая Akka). Особенно с такими новичками, как пони, кажется очевидным, что модель-актер здесь, чтобы остаться.
Это только вопрос времени, когда акторная модель станет стандартом, ее будут преподавать в университетах и использовать в корпоративных системах. Одним из самых больших недостатков Akka всегда было отсутствие гарантий времени компиляции. С появлением Akka Typed эта гарантия появилась. Я подозреваю, что в ближайшие годы мы увидим быстрый рост числа сложных параллельных систем. Перейти на эту подножку сейчас! Зачем ждать? :-)