GraphX ​​pregel и spark Streaming: RDD, помещенные в rddQueue внутри vprog, не обрабатываются.

Я использую прегель GraphX ​​и потоковую передачу искры. Я хочу, чтобы вершинная программа (vprog) создавала RDD и помещала его в rddQueue для обработки.

val queueOfRDDs:Queue[RDD[Int]] = Queue.empty[RDD[Int]]        
@transient val streamingContext:StreamingContext = new StreamingContext(sc, Seconds(1))    
val inputDStream = streamingContext.queueStream(queueOfRDDs,true,null)
inputDStream.map(x => (x % 10, 1)).reduceByKey(_ + _).print()
streamingContext.start()

val initialMessage = "init"

def vertexProgram(id: VertexId, attr: String, msgs: String): String =
  {
    queueOfRDDs.synchronized {
      for(a <- 1 to 3) {
        queueOfRDDs.+=sc.makeRDD(1 to 1000, 10)
        println("will add " + queueOfRDDs.size)
      }
    }
    msgs
  }

  def sendMessage(...){...}
  def messageCombiner(...){...}
  val newGraph = Pregel.apply(graph,initialMessage,1,EdgeDirection.Out)(vertexProgram,sendMessage,messageCombiner)

Ожидаемый результат:

will add1
    will add2
    will add3
    will add4
    will add5
    will add6
    will add7
    ...
    -------------------------------------------
    Time: 1503048820000 ms
    -------------------------------------------
    (0,100)
    (6,100)
    (3,100)
    (9,100)
    (4,100)
    (1,100)
    (7,100)
    (8,100)
    (5,100)
    (2,100)

    -------------------------------------------
    Time: 1503048820000 ms
    -------------------------------------------
    (0,100)
    (6,100)
    (3,100)
    (9,100)
    (4,100)
    (1,100)
    (7,100)
    (8,100)
    (5,100)
    (2,100)

    ...

    -------------------------------------------
    Time: 1503048820000 ms
    -------------------------------------------
    (0,100)
    (6,100)
    (3,100)
    (9,100)
    (4,100)
    (1,100)
    (7,100)
    (8,100)
    (5,100)
    (2,100)

Но я получил такой результат:

    will add1
    will add2
    will add3
    will add4
    will add5
    will add6
    will add7
    ...

RDD помещаются в очередь RDD (ее размер увеличивается), но они не обрабатываются. Не могли бы вы мне помочь


person DaliMidou    schedule 18.08.2017    source источник


Ответы (1)


TL;DR: это не работает.

Этот код выглядит неправильно. Похоже, вы пытаетесь создать инициализацию RDD из задачи (vertexProgram), возможно, сделав SparkContext ленивой или используя объектную оболочку.

Ваша программа добавляется к локальной копии Queue, которая не видна для фактической программы драйвера. Даже если бы это было RDDs, это соответствовало бы разным контекстам.

person Alper t. Turker    schedule 18.08.2017