Я играю с Akka-Streams и пытаюсь создать собственный Flow
, реализуя свой собственный PushPullStage
. Я хочу, чтобы Flow
накапливал объекты, которые он получает от восходящего потока, в список и группировал их в соответствии с некоторой функцией перед отправкой групп вниз по течению, когда восходящий поток завершается.
Кажется, что это довольно просто реализовать, но я не могу понять, как это сделать! Кажется, нет способа испустить несколько объектов из PushPullStage
.
Вот моя реализация до сих пор:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
for(group <- groups)
ctx.push(group) // this doesn't work
ctx.finish()
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
ИЗМЕНИТЬ
Я изменил код для учета тормозного давления, и теперь все работает. По сути, мне просто нужно было позволить нижестоящим Flow
делать то, для чего они предназначены, и продолжать извлекать элементы:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
groups match {
case Nil => ctx.finish()
case head :: tail =>
groups = tail
ctx.push(head)
}
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}