Странные ошибки при запуске задания Spark

Я запускаю искровой кластер с 80 машинами. Каждая машина представляет собой виртуальную машину с 8 ядрами и 50 ГБ памяти (кажется, 41 доступно для Spark).

Я работаю с несколькими входными папками, я оцениваю размер входных данных примерно в 250 ГБ со сжатием gz.

В логе драйвера получаю ошибки не знаю что делать. Примеры (в порядке их появления в логах):

240884 [Result resolver thread-0] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 445.0 in stage 1.0 (TID 445, hadoop-w-59.c.taboola-qa-01.internal): java.net.SocketTimeoutException: Read timed out
        java.net.SocketInputStream.socketRead0(Native Method)
        java.net.SocketInputStream.read(SocketInputStream.java:152)
        java.net.SocketInputStream.read(SocketInputStream.java:122)
        java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
        java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
        java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:687)
        sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1323)
        org.apache.spark.util.Utils$.fetchFile(Utils.scala:376)
        org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:325)
        org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:323)
        scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:323)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)


    271722 [Result resolver thread-3] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 247.0 in stage 2.0 (TID 883, hadoop-w-79.c.taboola-qa-01.internal): java.lang.NullPointerException: 
            org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
            org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
            org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            org.apache.spark.scheduler.Task.run(Task.scala:54)
            org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            java.lang.Thread.run(Thread.java:745)


309052 [Result resolver thread-1] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 272.0 in stage 2.0 (TID 908, hadoop-w-58.c.taboola-qa-01.internal): java.io.IOException: unexpected exception type
        java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
        java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
        org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)


820940 [connection-manager-thread] INFO org.apache.spark.network.ConnectionManager  - key already cancelled ? sun.nio.ch.SelectionKeyImpl@1c827563
java.nio.channels.CancelledKeyException
    at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
    at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)

Поскольку мой класс задания (Phase0) не является частью какой-либо трассировки стека, я не уверен, что могу узнать из этих ошибок об источнике проблемы. Какие-либо предложения?

РЕДАКТИРОВАТЬ: В частности, следующее исключение происходит со мной, даже когда я работаю с папкой размером в несколько ГБ:

271722 [Result resolver thread-3] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 247.0 in stage 2.0 (TID 883, hadoop-w-79.c.taboola-qa-01.internal): java.lang.NullPointerException: 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
            org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
            org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            org.apache.spark.scheduler.Task.run(Task.scala:54)
            org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            java.lang.Thread.run(Thread.java:745)

person Yaniv Donenfeld    schedule 24.10.2014    source источник
comment
Какая это версия Спарка?   -  person Josh Rosen    schedule 25.10.2014
comment
Я использую Spark 1.1.   -  person Yaniv Donenfeld    schedule 25.10.2014


Ответы (1)


Решение не относится к исключениям, упомянутым здесь, но в конечном итоге мне удалось решить все проблемы в Spark, используя следующие рекомендации:

  1. Все машины должны быть настроены с точки зрения ulimit и памяти процессов следующим образом:

Добавляем в /etc/security/limits.conf следующее:

hadoop soft nofile 900000
root soft nofile 900000
hadoop hard nofile 990000
root hard nofile 990000
hadoop hard memlock unlimited
root hard memlock unlimited
hadoop soft memlock unlimited
root soft memlock unlimited

В /etc/pam.d/common-session и в /etc/pam.d/common-session-noninteractive:

"session required pam_limits.so"
  1. Использование ядра: при использовании виртуальной машины я бы рекомендовал выделить n-1 ядра для Spark и оставить 1 ядро ​​для связи и других задач.

  2. Разделы: я бы рекомендовал использовать количество разделов, которое в 5-10 раз превышает количество используемых ядер в кластере. Если вы видите ошибки «недостаточно памяти», вам необходимо увеличить количество разделов (сначала за счет увеличения коэффициента, а затем за счет добавления машин в кластер)

  3. Вывод массива по ключу. Если вы видите такие ошибки, как: «Размер массива превышает лимит виртуальной машины», возможно, у вас слишком много данных для каждого ключа, и поэтому вам нужно уменьшить объем данных для каждого ключа. Например, если вы выводите файлы с интервалом в 1 час, попробуйте уменьшить интервал до 10 минут или даже до 1 минуты.

  4. Если вы все еще видите ошибки, найдите их в отчетах об ошибках Spark. Возможно, вы захотите убедиться, что обновили Spark до последней версии. Для меня в настоящее время в версии 1.2 исправлена ​​ошибка, которая приводила к сбою моей работы.

  5. Используйте регистратор Kryo, преобразуйте всю логику преобразования RDD для запуска каждого из них в отдельном классе и обязательно зарегистрируйте все эти классы с помощью Kryo.

person Yaniv Donenfeld    schedule 05.01.2015