Как напечатать один val в PartitionBy

У меня есть одна проблема в Apache Spark GraphX, я попытался разбить один граф с помощью этого метода в основном:

graph.partitionBy(HDRF, 128)

HDRF - это метод создания разделов, я хотел бы распечатать val, который находится внутри него, я пытался распечатать, но он ничего не печатает.

/ИЗМЕНИТЬ/

package app

import org.apache.spark.graphx._
import org.apache.spark._
import org.apache.spark.rdd.RDD


/**
  * Main del sistema
  */

object Main{


  def main(args: Array[String]) {

    val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("HDRF"))

    // mostra solo i log in caso di errore
    sc.setLogLevel("ERROR")

    //modifico il file di testo preso in ingresso
    val edges:RDD[Edge[String]]=
      sc.textFile("data/u1.base").map{ line =>
        val fields= line.split("\t")
        Edge(fields(0).toLong,fields(1).toLong,fields(2))
      }

    val graph: Graph[Any,String] =Graph.fromEdges(edges,"defaultProperty")

    graph.partitionBy(HDRF,128)


  }
}

.

package app

import org.apache.spark.graphx._
import scala.collection.concurrent.TrieMap

object HDRF extends PartitionStrategy{
  private var init=0; //lo puoi usare per controllare una fase di inizializzazione che viene eseguita solo la prima volta

  private var partitionsLoad:Array[Long] = Array.empty[Long] //carico (numero di archi) di ogni partizione
  private val vertexIdListPartitions: TrieMap[Long, List[Long]] = TrieMap() //lista di partizioni associate a ogni vertice
  private val vertexIdEdges: TrieMap[Long, Long] = TrieMap() //grado di ogni vertice

  private var edges = 0

  private var sum :Long= 0

  override def getPartition(src:VertexId,dst:VertexId,numParts:Int): PartitionID ={
    var valoreMax:Long =Int.MaxValue
    var partScarica:Int = -1
    var c:Int = 0
    if(init==0){
      init=1
      partitionsLoad=Array.fill[Long](numParts)(0)
    }


    //AGGIORNA IL GRADO CONOSCIUTO DEI VERTICI src E dst NELLA VARIABILE vertexIdEdges
    vertexIdEdges(src)=vertexIdEdges(src)+1
    vertexIdEdges(dst)=vertexIdEdges(dst)+1
    sum=vertexIdEdges(src) + vertexIdEdges(dst)

    //PARTIZIONA IL GRAFO
    if((!vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){
      //NESSUNO DEI DUE VERTICI E' STATO MAI INSERITO IN QUALCHE PARTIZIONE
      //SCELGO LA PARTZIIONE PIU' SCARICA E LI ASSEGNO A QUELLA
      while(c==numParts){
        if(partitionsLoad(c)<valoreMax){
          valoreMax=partitionsLoad(c)
          partScarica=c
        }
        c=c+1
      }
      if(partScarica != -1) {
        partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
        vertexIdListPartitions(partScarica).union(List(src, dst))
      }
      return partScarica

    }else if(((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst)))||((!vertexIdListPartitions.contains(src))&&(vertexIdListPartitions.contains(dst)))){
      //UNO SOLO DEI DUE VERTICI E' GIA' PRESENTE IN ALMENO UNA PARTIZIONE
      if((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){
        //SI TRATTA DI src
        //SCELGO LA PARTIZIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE src E CI REPLICO dst
        while(c==numParts){
          if(partitionsLoad(c)<valoreMax){
            if(vertexIdListPartitions(c).contains(src)) {
              valoreMax = partitionsLoad(c)
              partScarica = c
            }
          }
          c=c+1
        }
        if(partScarica != -1) {
          partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
          vertexIdListPartitions(partScarica).union(List(dst))
        }

      }else{
        //SI TRATTA DI dst
        //SCELGO LA PARTZIIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE dst E CI REPLICO src

        while(c==numParts){
          if(partitionsLoad(c)<valoreMax){
            if(vertexIdListPartitions(c).contains(src)) {
              valoreMax = partitionsLoad(c)
              partScarica = c
            }
          }
          c=c+1
        }
        if(partScarica != -1) {
          partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
          vertexIdListPartitions(partScarica).union(List(src))
        }

      }
    }else if(!vertexIdListPartitions(src).intersect(vertexIdListPartitions(dst)).isEmpty){
      //ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI ED ESISTE UNA INTERSEZIONE DEI SET NON NULLA (CIOE' ESISTE ALMENO UNA PARTIZIONE CHE LI CONTIENE ENTRAMBI)
      //SCELGO NELL'INTERSEZIONE DEI SET LA PARTIZIONE PIU' SCARICA

      while(c==numParts) {
        if (partitionsLoad(c) < valoreMax) {
          if (vertexIdListPartitions(c).contains(src) && vertexIdListPartitions(c).contains(dst)) {
            valoreMax = partitionsLoad(c)
            partScarica = c
          }
        }
        c = c + 1
      }
      if(partScarica != -1) {
        partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
        vertexIdListPartitions(partScarica).union(List(src))
      }

    }else {
      //ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI MA L'INTERSEZIONE DEI SET E' NULLA (CIOE' NON ESISTE ALCUNA PARTIZIONE CHE LI CONTIENE ENTRAMBI)
      if((vertexIdEdges(src))>=(vertexIdEdges(dst))){
        //SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO dst QUELLA PIU' SCARICA E CI COPIO src

        while(c==numParts){
          if(partitionsLoad(c)<valoreMax){
            if(vertexIdListPartitions(c).contains(dst)) {
              valoreMax = partitionsLoad(c)
              partScarica = c
            }
          }
          c=c+1
        }
        if(partScarica != -1) {
          partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
          vertexIdListPartitions(partScarica).union(List(src))
        }

      }else{
        //SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO src QUELLA PIU' SCARICA E CI COPIO dst

        while(c==numParts){
          if(partitionsLoad(c)<valoreMax){
            if(vertexIdListPartitions(c).contains(src)) {
              valoreMax = partitionsLoad(c)
              partScarica = c
            }
          }
          c=c+1
        }
        if(partScarica != -1) {
          partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
          vertexIdListPartitions(partScarica).union(List(dst))
        }
      }

    }
    edges=edges+1
    if(edges==80000) {
      print(sum)
    }
    return partScarica
  }
}

Мне нужно напечатать сумму, но я не понимаю, почему она не появляется.


person Andrea Cingolani    schedule 06.12.2017    source источник
comment
Во-первых, трудно ответить на вопрос, не видя реализации HDRF. Во-вторых, уточните, что вы ожидаете - когда и где вы ожидаете увидеть эту распечатку? HDRF будет использоваться Spark на различных рабочих узлах (т. е. не обязательно на драйвере, на котором работает ваш основной) и только тогда, когда результат этого разделения используется в действии (которое запускает ленивое вычисление график).   -  person Tzach Zohar    schedule 06.12.2017
comment
Я отредактировал пост и поместил сюда код   -  person Andrea Cingolani    schedule 06.12.2017


Ответы (1)


partitionBy, как и многие функции Graph, представляет собой операцию с ленивой оценкой, которая создает новый объект Graph, но на самом деле не вычисляет этот график до тех пор, пока в этом нет необходимости, то есть до тех пор, пока над результатом не будет выполнено какое-либо действие (например, подсчета, сохранения или сбора).

На более простом примере мы можем видеть, что если мы воздействуем на результат, эти отпечатки будут видны:

object SimpleExample extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    println("partitioning!")
    numParts
  }
}

val result = graph.partitionBy(SimpleExample, 128) // nothing printed so far...

result.edges.count() // now that we act on the result, 
// we see "paritioning!" printed (several times). 

ПРИМЕЧАНИЕ, что печать из PartitionStrategy (или любой функции преобразования, переданной в Spark для выполнения на RDD, Graph или Dataset) не очень полезна: эти функции выполняются на воркере nodes, поэтому эти отпечатки будут "разбросаны" по выводам разных процессов на разных машинах и, вероятно, НЕ будут видны в выводе приложения-драйвера (вашей основной функции).

person Tzach Zohar    schedule 06.12.2017