испускать несколько объектов из PushPullStage

Я играю с Akka-Streams и пытаюсь создать собственный Flow, реализуя свой собственный PushPullStage. Я хочу, чтобы Flow накапливал объекты, которые он получает от восходящего потока, в список и группировал их в соответствии с некоторой функцией перед отправкой групп вниз по течению, когда восходящий поток завершается.

Кажется, что это довольно просто реализовать, но я не могу понять, как это сделать! Кажется, нет способа испустить несколько объектов из PushPullStage.

Вот моя реализация до сих пор:

class Accumulate[A] extends PushPullStage[A, List[A]] {
    private var groups: List[List[A]] = Nil

    private def group(x: A): List[List[A]] = ...

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
      groups = group(elem)
      ctx.pull()
    }

    override def onPull(ctx: Context[A]): SyncDirective =
      if (ctx.isFinishing) {
        for(group <- groups)
          ctx.push(group)    // this doesn't work

        ctx.finish()
      } else {
        ctx.pull()
      }

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
      ctx.absorbTermination()
  }
}

ИЗМЕНИТЬ

Я изменил код для учета тормозного давления, и теперь все работает. По сути, мне просто нужно было позволить нижестоящим Flow делать то, для чего они предназначены, и продолжать извлекать элементы:

class Accumulate[A] extends PushPullStage[A, List[A]] {
    private var groups: List[List[A]] = Nil

    private def group(x: A): List[List[A]] = ...

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
      groups = group(elem)
      ctx.pull()
    }

    override def onPull(ctx: Context[A]): SyncDirective =
      if (ctx.isFinishing) {
        groups match {
          case Nil => ctx.finish()

          case head :: tail =>
            groups = tail
            ctx.push(head)
        }
      } else {
        ctx.pull()
      }

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
      ctx.absorbTermination()
  }
}

person Oli    schedule 19.09.2015    source источник


Ответы (1)


Вы не можете нажимать больше, чем требуется, так как это нарушит противодавление. Кроме того, стоит отметить, что я бы не рекомендовал то, что вы пытаетесь сделать, поскольку это приведет к ошибке OutOfMemoryError для больших или неограниченных потоков.

class Accumulate[A] extends PushPullStage[A, List[A]] {
    private var groups: List[List[A]] = Nil

    private def group(x: A): List[List[A]] = ...

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
      groups = group(elem)
      ctx.pull()
    }

    override def onPull(ctx: Context[A]): SyncDirective =
      if (ctx.isFinishing) {
        groups match {
          case Nil => ctx.finish()
          case group :: rest =>
            groups = rest
            ctx.push(group)
        }
      } else {
        ctx.pull()
      }

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
      ctx.absorbTermination()
  }
}
person Viktor Klang    schedule 19.09.2015
comment
Ааа, я не подумал об обратном давлении. Я немного изменил код, и теперь он работает как положено (см. мое редактирование). Спасибо :) - person Oli; 20.09.2015