PySpark reduceByKey вызывает нехватку памяти

Я пытаюсь запустить задание в режиме Yarn, которое обрабатывает большой объем данных (2 ТБ), считанных из облачного хранилища Google. Мой конвейер отлично работает с 10 ГБ данных. Характеристики моего кластера и начало моего конвейера подробно описаны здесь: Приложение PySpark Yarn не работает группировать

Вот остальная часть конвейера:

      input.groupByKey()\
      [...] processing on sorted groups for each key shard
      .mapPartitions(sendPartition)\
      .map(mergeShardsbyKey)
      .reduceByKey(lambda list1, list2: list1 + list2).take(10)
      [...] output

функция карты, которая применяется к разделам, следующая:

def sendPartition(iterator):
    pool = external_service_connection_pool()
    return [make_request(record, pool) for record in iterator]

def make_request(record, pool):
    [...] tags a record based on query results from the external service
    return key, taggedrecord

Во всем наборе данных выполнение не выполняется из-за:

java.lang.OutOfMemoryError: Java heap space

Я попытался получить немного больше информации, и я видел, что это не удается на reduceByKey, однако, начиная с mapPartitions, задача выполняется только на одном исполнителе до тех пор, пока не произойдет сбой на редукции (по крайней мере, только на одном исполнитель отображается в веб-интерфейсе Spark, и задание не разбивается на несколько задач до тех пор, пока не будет уменьшено)

Мой вопрос заключается в следующем: почему он работает только на 1 исполнителя? Несмотря на то, что документация, описывающая функцию, кажется, соответствует моему представлению о mapPartitions (http://spark.apache.org/docs/latest/programming-guide.html), это сбой или он должен работать после этого groupByKey?

РЕДАКТИРОВАТЬ: я попробовал меньший кластер с меньшим набором данных, и хотя это удалось, для обработки всех данных после groupByKey используется только один исполнитель. Более того, после каждого этапа есть несколько разделов, и этап groupByKey отмечается как «ожидающий» для каждого этапа после него в интерфейсе, когда я запускаю этапы один за другим.


person Paul K.    schedule 05.11.2015    source источник
comment
попробуйте указать NumTasks для reduceByKey   -  person Karthik    schedule 05.11.2015
comment
Извините, не очень понятно, редукция разбивается на задачи, а не mapPartitions и редукция использует все воркеры. Я просто подумал, что они могут быть как-то связаны.   -  person Paul K.    schedule 05.11.2015
comment
Используется ли использование mapPartitions в основном для максимально возможной оптимизации повторного использования пула соединений?   -  person Dennis Huo    schedule 05.11.2015
comment
да, он используется для этого, я знаю, что могу объединить разделы, чтобы иметь только один пул соединений на каждого рабочего, но я не думаю, что это проблема, поскольку мой внешний сервис не получает такой большой нагрузки, учитывая количество разделов ( около 58000). Странно то, что reduceByKey уже работает очень медленно на небольшом наборе данных (в 15 раз дольше, чем группа на небольшом кластере).   -  person Paul K.    schedule 05.11.2015
comment
Что произойдет, если вы добавите repartition (1000) перед mapPartitions?   -  person Dennis Huo    schedule 05.11.2015
comment
Я собираюсь попробовать прямо сейчас. Я тестировал вызов getNumPartitions между каждым этапом, это всегда 58000, хотя есть только 1 задача и 1 исполнитель. Вы думаете, что каждый раз RDD сжимается до одного раздела?   -  person Paul K.    schedule 05.11.2015
comment
Одна из возможностей заключается в том, что mapPartitions не повезло с динамическим распределением искры; на этом этапе также только 1 зарегистрированный исполнитель или другие исполнители просто не используются? В любом случае вы можете попробовать установить spark.executor.instances=99999 как --conf или в gcloud dataproc, --properties и посмотреть, имеет ли это значение   -  person Dennis Huo    schedule 05.11.2015
comment
этапы объединяются в 2 группы в веб-интерфейсе (уменьшить и сгруппировать), когда я использую инструмент командной строки gcloud dataproc для отправки, поэтому я не вижу часть mapPartitions. Однако сокращение по-прежнему кажется таким же медленным, как и на маленьком кластере. Завтра я проверю на более крупном кластере и, подключившись к мастеру, разделю задачи, чтобы увидеть, использует ли mapPartitions все исполнители.   -  person Paul K.    schedule 05.11.2015
comment
Давайте продолжим обсуждение в чате.   -  person Paul K.    schedule 06.11.2015