Как создать поток с разными входными и выходными типами для использования внутри графика?

Я делаю собственную раковину, строя график внутри. Вот широкое упрощение моего кода, чтобы продемонстрировать мой вопрос:

def mySink: Sink[Int, Unit] = Sink() { implicit builder =>

    val entrance = builder.add(Flow[Int].buffer(500, OverflowStrategy.backpressure))
    val toString = builder.add(Flow[Int, String, Unit].map(_.toString))
    val printSink = builder.add(Sink.foreach(elem => println(elem)))

    builder.addEdge(entrance.out, toString.in)
    builder.addEdge(toString.out, printSink.in)

    entrance.in
}

Проблема, с которой я сталкиваюсь, заключается в том, что, хотя можно создать поток с теми же типами ввода/вывода только с одним аргументом типа и без аргумента значения, например: Flow[Int] (который есть во всей документации), недопустимо только поставлять два параметра типа и параметры с нулевым значением.

Согласно справочная документация для объекта Flow метод apply, который я ищу, определяется как

def apply[I, O]()(block: (Builder[Unit]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit]

и говорит

Создает поток, передавая FlowGraph.Builder в заданную функцию создания.

Ожидается, что функция создания вернет пару входных и выходных портов, которые соответствуют созданным портам ввода и вывода потоков.

Кажется, что мне нужно иметь дело с другим уровнем построения графиков, когда я пытаюсь создать то, что я считаю очень простым потоком. Есть ли более простой и лаконичный способ создания потока, который изменяет тип его ввода и вывода, не требуя возиться с его внутренними портами? Если это правильный подход к этой проблеме, как будет выглядеть решение?

ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ: Почему легко создать поток, который не меняет тип входных данных на выходные?


person Asa    schedule 14.05.2015    source источник
comment
недопустимо указывать только два параметра типа и параметры с нулевым значением. Какова будет семантика таких потоков? Вы думаете о более короткой форме Flow[Int].map(_.toString)?   -  person jrudolph    schedule 14.05.2015
comment
Насколько я понимаю, Flow[Int].map(_.toString) недействителен, поскольку Flow[Int] указывает на поток от Int до Int. Однако тип вашей функции карты (_.toString) — Int => String. Это неправильно? (Ваш комментарий и взгляд на документы свежим взглядом заставляют меня подозревать, что я не прав)   -  person Asa    schedule 14.05.2015


Ответы (1)


Если вы хотите указать как входной, так и выходной тип потока, вам действительно нужно использовать метод применения, который вы нашли в документации. Однако его использование делается почти так же, как вы уже делали.

Flow[String, Message]() { implicit b =>
  import FlowGraph.Implicits._

  val reverseString = b.add(Flow[String].map[String] { msg => msg.reverse })
  val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))

  // connect the graph
  reverseString ~> mapStringToMsg

  // expose ports
  (reverseString.inlet, mapStringToMsg.outlet)
}

Вместо того, чтобы просто возвращать вход, вы возвращаете кортеж с входом и выходом. Теперь этот поток можно использовать (например, внутри другого компоновщика или напрямую с runWith) с определенным источником или приемником.

person Jos Dirksen    schedule 23.05.2015
comment
Весь этот ответ хорош, но часть, которую я специально искал, это Flow[String].map[Message]( x => TextMessage.Strict(x)). Спасибо! - person Asa; 24.05.2015