Я использую прегель GraphX и потоковую передачу искры. Я хочу, чтобы вершинная программа (vprog) создавала RDD и помещала его в rddQueue для обработки.
val queueOfRDDs:Queue[RDD[Int]] = Queue.empty[RDD[Int]]
@transient val streamingContext:StreamingContext = new StreamingContext(sc, Seconds(1))
val inputDStream = streamingContext.queueStream(queueOfRDDs,true,null)
inputDStream.map(x => (x % 10, 1)).reduceByKey(_ + _).print()
streamingContext.start()
val initialMessage = "init"
def vertexProgram(id: VertexId, attr: String, msgs: String): String =
{
queueOfRDDs.synchronized {
for(a <- 1 to 3) {
queueOfRDDs.+=sc.makeRDD(1 to 1000, 10)
println("will add " + queueOfRDDs.size)
}
}
msgs
}
def sendMessage(...){...}
def messageCombiner(...){...}
val newGraph = Pregel.apply(graph,initialMessage,1,EdgeDirection.Out)(vertexProgram,sendMessage,messageCombiner)
Ожидаемый результат:
will add1
will add2
will add3
will add4
will add5
will add6
will add7
...
-------------------------------------------
Time: 1503048820000 ms
-------------------------------------------
(0,100)
(6,100)
(3,100)
(9,100)
(4,100)
(1,100)
(7,100)
(8,100)
(5,100)
(2,100)
-------------------------------------------
Time: 1503048820000 ms
-------------------------------------------
(0,100)
(6,100)
(3,100)
(9,100)
(4,100)
(1,100)
(7,100)
(8,100)
(5,100)
(2,100)
...
-------------------------------------------
Time: 1503048820000 ms
-------------------------------------------
(0,100)
(6,100)
(3,100)
(9,100)
(4,100)
(1,100)
(7,100)
(8,100)
(5,100)
(2,100)
Но я получил такой результат:
will add1
will add2
will add3
will add4
will add5
will add6
will add7
...
RDD помещаются в очередь RDD (ее размер увеличивается), но они не обрабатываются. Не могли бы вы мне помочь