SparkR org.apache.spark.SparkException: рабочий процесс R неожиданно завершился

Я пытаюсь выполнить SparkR gapply , по сути, когда я пытаюсь запустить это с моим входным файлом, ограниченным примерно 300 тыс. строк, он работает, однако при масштабировании примерно до 1,2 млн строк я получаю следующее повторяющееся исключение в stderr во многих задачах исполнителя: примерно 70% задач завершены, в то время как другие провалены или убиты. Неудачные имеют тот же вывод ошибки:

org.apache.spark.SparkException: R worker exited unexpectedly (cranshed)
    at org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:240)
    at org.apache.spark.api.r.RRunner$$anon$1.next(RRunner.scala:91)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:346)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:212)
    ... 16 more

Какие параметры настройки следует учитывать помимо выделения большего объема памяти? Я считаю, что SparkR не так широко используется, как PySpark или Scala, и иногда их параметры настройки могут отличаться, поэтому любая помощь здесь будет очень признательна.

Это работает в кластере Databricks/AWS — 20 рабочих узлов, 30,5 ГБ памяти, 4 ядра каждый.

В нашем случае использования функция gapply работает максимум с 10 кадрами данных строк, разбивает максимум 20 столбцов на 4 кадра данных R, которые затем передаются в решатель линейной оптимизации с использованием пакетов R NlcOptim,quadprog.


person and_apo    schedule 02.05.2018    source источник
comment
gapply эффективно groupByKey, поэтому вызывает те же проблемы, и без конкретных ограничений на ввод, просто не масштабируйте. Исключение, которое вы, вероятно, является результатом сбоя, вызванного перекосом данных. R делает это хуже, но, вероятно, не настолько.   -  person Alper t. Turker    schedule 02.05.2018
comment
Итак, теоретически, как вы думаете, я мог бы разделить свой входной фрейм данных на N фреймов данных, а затем последовательно выполнить N функций gapply?   -  person and_apo    schedule 02.05.2018
comment
Я думаю, было бы полезно, если бы вы предоставили некоторый контекст того, что делает gapply. Может быть, есть лучший способ решить эту проблему.   -  person Alper t. Turker    schedule 02.05.2018
comment
Обновлен вопрос с (надеюсь) полезной информацией   -  person and_apo    schedule 02.05.2018
comment
Разве вы не можете разделить данные перед вызовом gapply и таким образом использовать меньшие группы?   -  person Alper t. Turker    schedule 02.05.2018
comment
Нет, входные группы настолько малы, насколько это возможно, то, чего я хочу достичь (линейная оптимизация), должно выполняться для каждой группы в целом.   -  person and_apo    schedule 02.05.2018
comment
Может быть, я неправильно вас понял. Таким образом, gapply уже получает 10 строк данных для каждой группы.   -  person Alper t. Turker    schedule 02.05.2018
comment
Давайте продолжим обсуждение в чате.   -  person and_apo    schedule 02.05.2018
comment
проверьте и дважды проверьте, что масштабирование непреднамеренно не привело к некоторым пограничным случаям в вашем коде, которые вызывают ошибки   -  person MichaelChirico    schedule 27.08.2018


Ответы (1)


Используйте .cache() и повторите попытку, чтобы решить эту проблему.

person and_apo    schedule 08.04.2020