Мне нужно сделать что-то действительно похожее на этот https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala
Моя проблема в том, что у меня неизвестное количество групп, и если количество параллелизма mapAsync меньше количества групп, которые я получил, и ошибка в последнем приемнике
Разрыв SynchronousFileSink (/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) из-за ошибки восходящего потока (akka.stream.impl.StreamSubscriptionTimeoutSupport $$ anon $ 2)
Я попытался поместить буфер посередине, как это было предложено в руководстве по шаблонам потоков akka http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html
groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
но с тем же результатом
mapAsync
какой-либо цели? Что произойдет, если вместо этого вы просто воспользуетесьmap
? - person jrudolph   schedule 26.08.2015Source[Something]
(послеgroupBy
у вас естьSource[Source[Something]]
, верно?). Итак, единственное, что вам нужно сделать внутриmap
(foreach
также должно работать), - это запустить подпотоки, что является асинхронной операцией. Затем подпотоки будут выполняться сами по себе, и ваш элементmap
сможет принять следующийSource[Something]
. - person jrudolph   schedule 26.08.2015