Пример сценария: сгруппируйте байты потока в куски, размер которых определяется другим потоком (целых чисел).
def partition[A, B, C](
first:Source[A, NotUsed],
second:Source[B, NotUsed],
aggregate:(Int => Seq[A], B) => C
):Source[C, NotUsed] = ???
val bytes:Source[Byte, NotUsed] = ???
val sizes:Source[Int, NotUsed] = ???
val chunks:Source[ByteString, NotUsed] =
partition(bytes, sizes, (grab, count) => ByteString(grab(count)))
Моя первоначальная попытка включает комбинацию Flow#scan и Flow#prefixAndTail, но это не совсем правильно (см. ниже). Я также ознакомился с Framing, но, похоже, это неприменимо к приведенному выше примерному сценарию (и не является достаточно общим для обработки потоков, отличных от строк байтов). Я предполагаю, что мой единственный вариант — использовать Graphs (или более общий FlowOps#transform), но я почти не достаточно опытен (пока) с потоками Akka, чтобы попытаться это сделать.
Вот что я смог придумать до сих пор (конкретно для примера сценария):
val chunks:Source[ByteString, NotUsed] = sizes
.scan(bytes prefixAndTail 0) {
(grouped, count) => grouped flatMapConcat {
case (chunk, remainder) => remainder prefixAndTail count
}
}
.flatMapConcat(identity)
.collect { case (chunk, _) if chunk.nonEmpty => ByteString(chunk:_*) }