Ошибка при создании графа: требование не выполнено: входы [] и выходы [] должны соответствовать входам [вход] и выходам [выход]

Я использую akka streams graphDSL для создания работающего графа. Ошибок времени компиляции на входе/выходе компонентов потока нет. Среда выполнения выдает следующую ошибку:

Любые идеи, что я должен проверить, чтобы заставить его работать?

requirement failed: The inlets [] and outlets [] must correspond to the inlets [in] and outlets [out]
at scala.Predef$.require(Predef.scala:219)
at akka.stream.Shape.requireSamePortsAs(Shape.scala:168)
at akka.stream.impl.StreamLayout$CompositeModule.replaceShape(StreamLayout.scala:390)
at akka.stream.scaladsl.GraphApply$class.create(GraphApply.scala:18)
at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:813)
at com.flipkart.connekt.busybees.streams.Topology$.bootstrap(Topology.scala:109)
at com.flipkart.connekt.busybees.BusyBeesBoot$.start(BusyBeesBoot.scala:65)
at com.flipkart.connekt.boot.Boot$.delayedEndpoint$com$flipkart$connekt$boot$Boot$1(Boot.scala:39)
at com.flipkart.connekt.boot.Boot$delayedInit$body.apply(Boot.scala:13)

Структура графа:

source ~> flowRate ~> render ~> platformPartition.in
platformPartition.out(0) ~> formatIOS ~> apnsDispatcher ~> apnsEventCreator ~> merger.in(0)
platformPartition.out(1) ~> formatAndroid ~> httpDispatcher ~> gcmPoolFlow ~> rHandlerGCM ~> merger.in(1)
merger.out ~> evtCreator ~> Sink.ignore

person phantomastray    schedule 17.02.2016    source источник
comment
Можете ли вы опубликовать свой план графика?   -  person manub    schedule 17.02.2016
comment
Я обновил вопрос со структурой графа. Все параметры источника/потока/приемника совпадают на входе/выходе.   -  person phantomastray    schedule 17.02.2016
comment
Также могут быть полезны типы render, platformPartition, merger и evtCreator.   -  person manub    schedule 17.02.2016
comment
Я думаю, что более широкая часть вашего кода может быть полезна.   -  person manub    schedule 17.02.2016


Ответы (1)


У вас есть неиспользуемый вход или выход (один из ваших потоков или что-то не подключено со всех сторон). Вот некоторые примеры:

Это работает:

val workingFlow =
  Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    val intFlow = b.add(Flow[Int])
    FlowShape(intFlow.in, intFlow.out)
  })

Следующий код выдает ошибку, похожую на вашу, потому что у него есть весь неиспользуемый поток:

val buggyFlow =
  Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    val intFlow = b.add(Flow[Int])
    val unusedFlow = b.add(Flow[Int]) // ERROR: This flow is unused
    FlowShape(intFlow.in, intFlow.out)
  })

Чуть более сложный пример: здесь не весь неиспользуемый поток, а просто неиспользуемый выход. Выдает ту же ошибку:

val buggyFlow =
  Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val broadcast = b.add(Broadcast[Int](2))
    val intFlow = b.add(Flow[Int])
    val unusedFlow = b.add(Flow[Int]) // ERROR: This flow's outlet isn't used

    broadcast ~> intFlow
    broadcast ~> unusedFlow

    FlowShape(broadcast.in, intFlow.out)
  })
person Hibuki    schedule 02.03.2016
comment
да, мы поняли это давно. Сообщение об ошибке не является интуитивно понятным. Я должен был обновить вопрос. :) Спасибо, хотя за изучение этого. - person phantomastray; 03.03.2016