Как агрегировать элементы одного потока Akka на основе элементов другого?

Пример сценария: сгруппируйте байты потока в куски, размер которых определяется другим потоком (целых чисел).

def partition[A, B, C](
  first:Source[A, NotUsed],
  second:Source[B, NotUsed],
  aggregate:(Int => Seq[A], B) => C
):Source[C, NotUsed] = ???

val bytes:Source[Byte, NotUsed] = ???
val sizes:Source[Int, NotUsed] = ???

val chunks:Source[ByteString, NotUsed] =
  partition(bytes, sizes, (grab, count) => ByteString(grab(count)))

Моя первоначальная попытка включает комбинацию Flow#scan и Flow#prefixAndTail, но это не совсем правильно (см. ниже). Я также ознакомился с Framing, но, похоже, это неприменимо к приведенному выше примерному сценарию (и не является достаточно общим для обработки потоков, отличных от строк байтов). Я предполагаю, что мой единственный вариант — использовать Graphs (или более общий FlowOps#transform), но я почти не достаточно опытен (пока) с потоками Akka, чтобы попытаться это сделать.


Вот что я смог придумать до сих пор (конкретно для примера сценария):

val chunks:Source[ByteString, NotUsed] = sizes
  .scan(bytes prefixAndTail 0) {
    (grouped, count) => grouped flatMapConcat {
      case (chunk, remainder) => remainder prefixAndTail count
    }
  }
  .flatMapConcat(identity)
  .collect { case (chunk, _) if chunk.nonEmpty => ByteString(chunk:_*) }

person Andrey    schedule 20.03.2016    source источник


Ответы (1)


Я думаю, вы можете реализовать обработку в виде пользовательского GraphStage. На сцене будет два элемента Inlet. Один принимает байты, а другой принимает размеры. Он будет иметь один элемент Outlet, производящий значения.

Рассмотрим следующие входные потоки.

def randomChars = Iterator.continually(Random.nextPrintableChar())
def randomNumbers = Iterator.continually(math.abs(Random.nextInt() % 50))

val bytes: Source[Char, NotUsed] =
  Source.fromIterator(() => randomChars)

val sizes: Source[Int, NotUsed] =
  Source.fromIterator(() => randomNumbers).filter(_ != 0)

Затем, используя информацию, описывающую обработку пользовательского потока (http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html) вы можете создать файл GraphStage.

case class ZipFraming() extends GraphStage[FanInShape2[Int, Char, (Int, ByteString)]] {

  override def initialAttributes = Attributes.name("ZipFraming")

  override val shape: FanInShape2[Int, Char, (Int, ByteString)] =
    new FanInShape2[Int, Char, (Int, ByteString)]("ZipFraming")

  val inFrameSize: Inlet[Int] = shape.in0
  val inElements: Inlet[Char] = shape.in1

  def out: Outlet[(Int, ByteString)] = shape.out

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      // we will buffer as much as 512 characters from the input
      val MaxBufferSize = 512
      // the buffer for the received chars
      var buffer = Vector.empty[Char]
      // the needed number of elements
      var needed: Int = -1
      // if the downstream is waiting
      var isDemanding = false

      override def preStart(): Unit = {
        pull(inFrameSize)
        pull(inElements)
      }

      setHandler(inElements, new InHandler {
        override def onPush(): Unit = {
          // we buffer elements as long as we can
          if (buffer.size < MaxBufferSize) {
            buffer = buffer :+ grab(inElements)
            pull(inElements)
          }
          emit()
        }
      })

      setHandler(inFrameSize, new InHandler {
        override def onPush(): Unit = {
          needed = grab(inFrameSize)
          emit()
        }
      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          isDemanding = true
          emit()
        }
      })

      def emit(): Unit = {
        if (needed > 0 && buffer.length >= needed && isDemanding) {
          val (emit, reminder) = buffer.splitAt(needed)
          push(out, (needed, ByteString(emit.map(_.toByte).toArray)))
          buffer = reminder
          needed = -1
          isDemanding = false
          pull(inFrameSize)
          if (!hasBeenPulled(inElements)) pull(inElements)
        }
      }
    }
}

И вот как вы его запускаете.

RunnableGraph.fromGraph(GraphDSL.create(bytes, sizes)(Keep.none) { implicit b =>
  (bs, ss) =>
    import GraphDSL.Implicits._

    val zipFraming = b.add(ZipFraming())

    ss ~> zipFraming.in0
    bs ~> zipFraming.in1

    zipFraming.out ~> Sink.foreach[(Int, ByteString)](e => println((e._1, e._2.utf8String)))

    ClosedShape
}).run()
person lpiepiora    schedule 20.03.2016
comment
проблема с этим подходом заключается в том, что число символов size будет всегда выбираться из переднего числа bytes, поэтому все элементы результирующего потока будут иметь один и тот же префикс. - person Andrey; 20.03.2016
comment
@Андрей ты прав. Я обновил свой ответ фактической реализацией GraphStage, которая работает как что-то среднее между Zip и Framing. - person lpiepiora; 20.03.2016