Edgetriples не транслируются должным образом

Я создал график с помощью graphx, и теперь мне нужно извлечь подграфы из исходного графика. В следующем коде я пытаюсь транслировать edgetriples и фильтровать их для каждого идентификатора пользователя.

class VertexProperty(val id:Long) extends Serializable
case class User(val userId:Long, var offset:Int, val userCode:String, val Name:String, val Surname:String, val organizational_unit:String, val UME:String, val person_type:String, val SOD_HIGH:String, val SOD_MEDIUM:String, val SOD_LOW:String, val Under_mitigated:String) extends VertexProperty(userId)
case class Account(val accountId:Long, var offset:Int, val userCode:String, val userId:String, val account_creation_date:String, var disabled:String, var forcechangepwd:String, var pwdlife:String, var numberloginerror:String, var lastchangepwd:String, var lastlogin:String, var lastwronglogin:String, var state:String, var expire:String, var last_cert_time:String, var creation_date:String, var creation_user:String,var challenge_counter:String, var challenge_failed_attempt:String) extends VertexProperty(accountId) //Check if userCode is actually the code in this example.
case class Application(var applicationId:Long, var offset:Int, var Name:String, var Description:String, var Target:String, var Owner:String, var Ownercode:String, var Creation_date:String, var Creation_user:String) extends VertexProperty(applicationId)
case class Entitlement(val entitlementId:Long, var offset:Int, val Name:String, var Code:String, var Description:String, var Type:String, var Application:String, var Administrative:String, var Parent_ID:String, var Owner_code:String, var Scope_type:String, var Business_name:String, var Business_policy:String, var SOD_high:String, var SOD_medium:String, var SOD_low:String) extends VertexProperty(entitlementId)

def compute_user_triplets(uId:String, bcast_triplets:Broadcast[Array[EdgeTriplet[VertexProperty,String]]]):ArrayBuffer[EdgeTriplet[VertexProperty, String]] = {
    var user_triplets = ArrayBuffer[EdgeTriplet[VertexProperty, String]]()
    var triplets = bcast_triplets.value
    for(x <- triplets){
        if(x.attr == uId){
            user_triplets += x
        }
    }
    return user_triplets
}

//Some code for computing vertexRDD and edges
val edges : RDD[Edge[String]] = sc.union(user_account_edges, account_application_edges, user_entitlement_edges)
val vertexRDD: RDD[(VertexId, VertexProperty)] = vertices.map(t => (t.id, t)) 

val graph: Graph[VertexProperty,String] = Graph(vertexRDD, edges, new VertexProperty(-1))
val triplets = graph.triplets
val temp = triplets.map(t => t.attr)
val distinct_users = temp.distinct.filter(t => t != "NULL")

val bcast_triplets = sc.broadcast(triplets.collect())
val users_triplets = distinct_users.map(uId => compute_user_triplets(uId, bcast_triplets)) 

Но я получаю сообщение об ошибке ниже после запуска последней строки кода. Почему я получаю эту ошибку?"

org.apache.spark.SparkException: Task not serializable


person Anshit Chaudhary    schedule 08.06.2016    source источник
comment
Как определяются значения vertexRDD и edges? Вероятно, вы генерируете один или оба из них динамически, и что-то в этом коде не сериализуемо.   -  person David Griffin    schedule 08.06.2016
comment
Я добавил код для vertexRDD и ребер. Я не могу понять, что именно в моем коде не сериализуемо.   -  person Anshit Chaudhary    schedule 09.06.2016
comment
Вы не весь код выложили. Опубликуйте все это. Не могу вам помочь, пока вы не опубликуете все.   -  person David Griffin    schedule 09.06.2016