Запустите задание из оперативной памяти (java.lang.OutOfMemoryError), даже если ее много. хмх слишком низкий?

Я получаю java.lang.OutOfMemoryError с моим заданием Spark, хотя используется только 20% всей памяти.

Я пробовал несколько конфигураций:

  • 1x n1-highmem-16 + 2x n1-highmem-8
  • 3x n1-highmem-8

Мой набор данных состоит из 1,8 млн записей, прочитанных из локального файла json на главном узле. Весь набор данных в формате json составляет 7 ГБ. Задание, которое я пытаюсь выполнить, включает в себя простое вычисление, за которым следует метод reduceByKey. Ничего экстраординарного. Работа выполняется нормально на моем единственном домашнем компьютере с оперативной памятью всего 32 ГБ (xmx28g), хотя для этого требуется некоторое кэширование на диск.

Задание отправляется через spark-submit локально на сервере (SSH).

Трассировку стека и конфигурацию Spark можно посмотреть здесь: https://pastee.org/sgda

Код

val rdd = sc.parallelize(Json.load()) // load everything
  .map(fooTransform)                  // apply some trivial transformation
  .flatMap(_.bar.toSeq)               // flatten results
  .map(c => (c, 1))                   // count 
  .reduceByKey(_ + _)
  .sortBy(_._2)
log.v(rdd.collect.map(toString).mkString("\n"))

person habitats    schedule 15.02.2016    source источник
comment
Не могли бы вы показать, как вы распределяете RDD по узлам?   -  person Mikel Urkia    schedule 15.02.2016
comment
Я впервые работаю с более чем одним узлом. Я добавил фрагмент соответствующего кода Spark. Нужно ли вручную разбивать его? Извините, если я неправильно понял ваш вопрос. Я все еще учусь.   -  person habitats    schedule 15.02.2016


Ответы (1)


Корень проблемы заключается в том, что вы должны попытаться разгрузить больше операций ввода-вывода для распределенных задач, а не передавать их туда и обратно между программой-драйвером и рабочими задачами. Хотя иногда может быть неочевидно, какие вызовы являются локальными для драйвера, а какие описывают распределенное действие, эмпирические правила включают в себя избегание parallelize и collect, если вам абсолютно не нужны все данные в одном месте. Объемы данных, которые вы можете Json.load() и parallelize, будут максимальными на любом возможном максимальном типе машины, тогда как использование таких вызовов, как sc.textFile, теоретически без проблем масштабируется до сотен ТБ или даже ПБ.

Краткосрочным решением в вашем случае будет попытка передать spark-submit --conf spark.driver.memory=40g ... или что-то в этом диапазоне. По умолчанию Dataproc выделяет менее четверти машины для памяти драйвера, потому что обычно кластер должен поддерживать выполнение нескольких одновременных заданий, а также должен оставлять достаточно памяти на главном узле для именного узла HDFS и диспетчера ресурсов YARN.

В более долгосрочной перспективе вы, возможно, захотите поэкспериментировать с тем, как вы можете загружать данные JSON как RDD напрямую, вместо того, чтобы загружать их в один драйвер и использовать parallelize для его распространения, поскольку таким образом вы можете значительно ускорить время чтения ввода с помощью задач загружать данные параллельно (а также избавиться от предупреждения Stage 0 contains a task of very large size, которое, вероятно, связано с отправкой больших данных от вашего драйвера к рабочим задачам).

Точно так же вместо collect и последующего завершения работы с программой-драйвером вы можете делать такие вещи, как sc.saveAsTextFile, для сохранения распределенным образом, без узких мест в одном месте.

Чтение ввода как sc.textFile будет означать, что JSON разделен строками, и вы можете выполнить синтаксический анализ внутри некоторой задачи map или попробовать использовать sqlContext.read.json. В целях отладки часто вместо использования collect() достаточно просто вызвать take(10), чтобы просмотреть некоторые записи, не отправляя их все драйверу.

person Dennis Huo    schedule 16.02.2016
comment
О, чувак, в тебе много смысла. Оказывается, я не совсем понял разницу между sc.textFile и parallalize. Я думал, что оба работают в стиле sc.textFile, но да, я понимаю, что я идиот. Спасибо! - person habitats; 16.02.2016
comment
О боже, это так быстро сейчас! Ха-ха. Ух ты. Спасибо еще раз! Вы сделали мой день. - person habitats; 16.02.2016
comment
Рад, что получилось! К тонкостям этих распределенных фреймворков, безусловно, нужно привыкнуть :) Если вы еще этого не сделали, вы также можете попробовать загрузить свои данные из Google Cloud Storage (GCS); в Dataproc вы можете использовать любые пути gs:// с sc.textFile, и таким образом вы можете получить распределенный доступ с нескольких узлов или даже нескольких кластеров без большого количества копий в/из локальных файловых систем. Вам просто нужно убедиться, что вы создаете свою корзину GCS в том же регионе, что и ваши виртуальные машины, чтобы избежать задержек и затрат на разных континентах. - person Dennis Huo; 16.02.2016
comment
Да, я действительно пробовал именно это раньше, но я обнаружил, что производительность очень плохая по сравнению с простым сканированием всего на всех узлах перед выполнением. Мой набор данных статичен и постоянен, так что это кажется самым быстрым. Но да, я считаю, что Spark очень сложно отлаживать, и для него не так много бесплатной поддержки, кроме SO. Ваша помощь очень ценится! - person habitats; 17.02.2016