Spark GraphX: требование не выполнено: недопустимая начальная емкость

Я новичок в Spark, Scala.

Я пытаюсь выполнить подсчет треугольников в этом наборе данных: DataSet
для хобби-проект

Это код, который я написал до сих пор:

   import org.apache.spark.SparkConf
   import org.apache.spark.SparkContext
   import org.apache.spark.graphx.Edge
   import org.apache.spark.graphx.Graph
   import org.apache.spark.graphx.Graph.graphToGraphOps
   import org.apache.spark.graphx.PartitionStrategy
   import org.apache.spark.rdd.RDD
   import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

   object GraphXApps {
       def main(args: Array[String]): Unit = {
               val conf = new SparkConf()
               .setAppName("GraphXApps")
               .setSparkHome(System.getenv("SPARK_HOME"))
               .setJars(SparkContext.jarOfClass(this.getClass).toList)

               val sc = new SparkContext(conf)

               // Load the edges in canonical order and partition the graph for triangle count
               val edges: RDD[Edge[String]] =
               sc.textFile(args(0)).map { line =>
               val fields = line.split("\t")
               Edge(fields(0).toLong, fields(1).toLong)
               }

               val graph : Graph[String, String] = Graph.fromEdges(edges.sortBy(_.srcId, ascending = true, 1), "defaultProperty").partitionBy(PartitionStrategy.RandomVertexCut)
                       // Find the triangle count for each vertex
                       val triCounts = graph.triangleCount().vertices
                       val triCountById = graph.vertices.join(triCounts).map(_._2._2)   
                       // Print the result
                       println(triCountById.collect().mkString("\n"))

                       sc.stop()

       }
   }

Но я получаю эту ошибку: java.lang.IllegalArgumentException: требование не выполнено: недопустимая начальная емкость

Пожалуйста, дайте мне знать, где я ошибаюсь. Это было бы очень полезно.

Полная трассировка стека

16/10/31 01:03:08 ERROR TaskSetManager: Task 0 in stage 8.0 failed 1 times; aborting job
16/10/31 01:03:08 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 
16/10/31 01:03:08 INFO TaskSchedulerImpl: Cancelling stage 8
16/10/31 01:03:08 INFO DAGScheduler: ShuffleMapStage 8 (mapPartitions at VertexRDDImpl.scala:245) failed in 0.131 s
16/10/31 01:03:08 INFO DAGScheduler: Job 0 failed: collect at GraphXApps.scala:47, took 3.128921 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8, localhost): java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:51)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:57)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:70)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:69)
    at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:154)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
    at GraphXApps$.main(GraphXApps.scala:47)
    at GraphXApps.main(GraphXApps.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:51)
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:57)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:70)
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:69)
    at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156)
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:154)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

person argumentopruvo    schedule 31.10.2016    source источник
comment
Мне еще предстоит определить причину ошибки, но я смог успешно опробовать ваш сценарий с помощью GraphFrames. Интересно ли это решение?   -  person Denny Lee    schedule 01.11.2016


Ответы (1)


Похоже, это ошибка в Spark 2.0 (до сих пор это проверялось на 2.0, 2.0.1 и 2.0.2). Jira [SPARK-18200]: GraphX ​​недопустима начальная емкость при запуске треугольника. создан для решения этой проблемы.

Ваш код должен нормально работать со Spark 1.6, как указано в этой ссылке блокнот.

Но, как вы заметили, в Spark 2.0 это не удалось, как указано в этой ссылке блокнот.

А пока попробуйте Spark 1.6 или попробуйте использовать GraphFrames, как указано в этой ссылке блокнот.

ХТХ!

person Denny Lee    schedule 01.11.2016
comment
Спасибо, что сообщили об этом как о серьезной ошибке. Я уверен, что люди, занимающиеся наукой о данных, такие как я, найдут это очень полезным, и спасибо за помощь! - person argumentopruvo; 02.11.2016
comment
Рад помочь! Обратите внимание, что GraphFrames обновляется более регулярно и более производительно (поскольку он может использовать DataFrames). Вы можете получить как лучшую производительность, так и простоту использования при использовании GraphFrames. - person Denny Lee; 02.11.2016