Я пытаюсь запустить задание в режиме Yarn, которое обрабатывает большой объем данных (2 ТБ), считанных из облачного хранилища Google. Мой конвейер отлично работает с 10 ГБ данных. Характеристики моего кластера и начало моего конвейера подробно описаны здесь: Приложение PySpark Yarn не работает группировать
Вот остальная часть конвейера:
input.groupByKey()\
[...] processing on sorted groups for each key shard
.mapPartitions(sendPartition)\
.map(mergeShardsbyKey)
.reduceByKey(lambda list1, list2: list1 + list2).take(10)
[...] output
функция карты, которая применяется к разделам, следующая:
def sendPartition(iterator):
pool = external_service_connection_pool()
return [make_request(record, pool) for record in iterator]
def make_request(record, pool):
[...] tags a record based on query results from the external service
return key, taggedrecord
Во всем наборе данных выполнение не выполняется из-за:
java.lang.OutOfMemoryError: Java heap space
Я попытался получить немного больше информации, и я видел, что это не удается на reduceByKey
, однако, начиная с mapPartitions
, задача выполняется только на одном исполнителе до тех пор, пока не произойдет сбой на редукции (по крайней мере, только на одном исполнитель отображается в веб-интерфейсе Spark, и задание не разбивается на несколько задач до тех пор, пока не будет уменьшено)
Мой вопрос заключается в следующем: почему он работает только на 1 исполнителя? Несмотря на то, что документация, описывающая функцию, кажется, соответствует моему представлению о mapPartitions
(http://spark.apache.org/docs/latest/programming-guide.html), это сбой или он должен работать после этого groupByKey
?
РЕДАКТИРОВАТЬ: я попробовал меньший кластер с меньшим набором данных, и хотя это удалось, для обработки всех данных после groupByKey используется только один исполнитель. Более того, после каждого этапа есть несколько разделов, и этап groupByKey отмечается как «ожидающий» для каждого этапа после него в интерфейсе, когда я запускаю этапы один за другим.