Как использовать сгруппированные подпотоки с помощью mapAsync в потоках akka

Мне нужно сделать что-то действительно похожее на этот https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala

Моя проблема в том, что у меня неизвестное количество групп, и если количество параллелизма mapAsync меньше количества групп, которые я получил, и ошибка в последнем приемнике

Разрыв SynchronousFileSink (/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) из-за ошибки восходящего потока (akka.stream.impl.StreamSubscriptionTimeoutSupport $$ anon $ 2)

Я попытался поместить буфер посередине, как это было предложено в руководстве по шаблонам потоков akka http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html

groupBy {
  case LoglevelPattern(level) => level
  case other                  => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
  // write lines of each group to a separate file
  mapAsync(parallelism = 2) {....

но с тем же результатом


person Sammyrulez    schedule 25.08.2015    source источник
comment
Интересно, служит ли использование mapAsync какой-либо цели? Что произойдет, если вместо этого вы просто воспользуетесь map?   -  person jrudolph    schedule 26.08.2015
comment
с картой группы не потребляются параллельно / асинхронно, что является моим желаемым поведением   -  person Sammyrulez    schedule 26.08.2015
comment
Я думаю, это заблуждение. Все группы представлены Source[Something] (после groupBy у вас есть Source[Source[Something]], верно?). Итак, единственное, что вам нужно сделать внутри map (foreach также должно работать), - это запустить подпотоки, что является асинхронной операцией. Затем подпотоки будут выполняться сами по себе, и ваш элемент map сможет принять следующий Source[Something].   -  person jrudolph    schedule 26.08.2015


Ответы (1)


Расширение комментария Джрудольфа, который полностью правильный ...

В этом случае вам не нужен mapAsync. В качестве базового примера предположим, что у вас есть источник кортежей

import akka.stream.scaladsl.{Source, Sink}

def data() = List(("foo", 1),
                  ("foo", 2),
                  ("bar", 1),
                  ("foo", 3),
                  ("bar", 2))

val originalSource = Source(data)

Затем вы можете выполнить groupBy, чтобы создать Источник источников

def getID(tuple : (String, Int)) = tuple._1

//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID

Каждый из сгруппированных источников может обрабатываться параллельно с помощью всего map, не нужно ничего особенного. Вот пример суммирования каждой группы в независимом потоке:

import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

def getValues(tuple : (String, Int)) = tuple._2

//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)

//a Source of (String, Future[Int])
val sumSource  = 
  groupedSource map { case (id, src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream
  }

Теперь все числа "foo" суммируются параллельно со всеми числами "bar".

mapAsync используется, когда у вас есть инкапсулированная функция, которая возвращает Future[T], и вместо этого вы пытаетесь выдать T; что не относится к вашему вопросу. Кроме того, mapAsync включает ожидание результатов, что не является реактивный ...

person Ramón J Romero y Vigil    schedule 27.11.2015
comment
Теперь, когда потоки akka были объединены с akka, а семантика Source of Sources заменена на SubFlows, как можно добиться аналогичного поведения? - person AlphaGeek; 29.03.2016
comment
@AlphaGeek Я заметил, что функциональность groupBy изменилась пару месяцев назад. В кулинарной книге есть обновленный пример, основанный на новом подходе к субпотокам: doc.akka.io/docs/akka/2.4.2/scala/stream/ - person Ramón J Romero y Vigil; 29.03.2016
comment
@RamonJRomeroyVigil Cookbook не объясняет этого. Они используют reduce, что недопустимо, если в каждом подпотоке есть миллионы элементов. Я (и автор тоже) хочу получить Source на группу. Как бы мы это сделали? - person expert; 22.01.2017
comment
Была ли когда-нибудь резолюция по этому поводу? - person Zee; 08.09.2018
comment
@Zee К сожалению, новая реализация субпотоков не передает ключ, связанный с субпотоком. Поэтому я не знаю, как сгенерировать вывод, запрошенный в вопросе ... - person Ramón J Romero y Vigil; 08.09.2018
comment
Это то же самое, что originalSource.groupBy(..).map(...).async? т.е. каждая функция карты группы будет выполняться в отдельном потоке? - person JavaTechnical; 27.05.2020
comment
@ RamónJRomeroyVigil algd.github.io/akka/2018/ 05.08 / parallel-stream-processing.html - в этом сообщении говорится иное (см. Последние 2 раздела) - person JavaTechnical; 27.05.2020