Я пытаюсь выполнить SparkR gapply
, по сути, когда я пытаюсь запустить это с моим входным файлом, ограниченным примерно 300 тыс. строк, он работает, однако при масштабировании примерно до 1,2 млн строк я получаю следующее повторяющееся исключение в stderr во многих задачах исполнителя: примерно 70% задач завершены, в то время как другие провалены или убиты. Неудачные имеют тот же вывод ошибки:
org.apache.spark.SparkException: R worker exited unexpectedly (cranshed)
at org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:240)
at org.apache.spark.api.r.RRunner$$anon$1.next(RRunner.scala:91)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:346)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:212)
... 16 more
Какие параметры настройки следует учитывать помимо выделения большего объема памяти? Я считаю, что SparkR не так широко используется, как PySpark или Scala, и иногда их параметры настройки могут отличаться, поэтому любая помощь здесь будет очень признательна.
Это работает в кластере Databricks/AWS — 20 рабочих узлов, 30,5 ГБ памяти, 4 ядра каждый.
В нашем случае использования функция gapply
работает максимум с 10 кадрами данных строк, разбивает максимум 20 столбцов на 4 кадра данных R, которые затем передаются в решатель линейной оптимизации с использованием пакетов R NlcOptim,quadprog
.
gapply
эффективноgroupByKey
, поэтому вызывает те же проблемы, и без конкретных ограничений на ввод, просто не масштабируйте. Исключение, которое вы, вероятно, является результатом сбоя, вызванного перекосом данных. R делает это хуже, но, вероятно, не настолько. - person Alper t. Turker   schedule 02.05.2018gapply
. Может быть, есть лучший способ решить эту проблему. - person Alper t. Turker   schedule 02.05.2018gapply
и таким образом использовать меньшие группы? - person Alper t. Turker   schedule 02.05.2018gapply
уже получает 10 строк данных для каждой группы. - person Alper t. Turker   schedule 02.05.2018