Существует ряд вопросов о том, как получить количество разделов n RDD
и / или DataFrame
: ответы неизменно следующие:
rdd.getNumPartitions
or
df.rdd.getNumPartitions
К сожалению, это дорогостоящая операция на DataFrame
, потому что
df.rdd
требуется преобразование из DataFrame
в rdd
. Это примерно столько же времени, сколько требуется для запуска.
df.count
Я пишу логику, которая необязательно repartition
или coalesce
DataFrame
- в зависимости от того, находится ли текущее количество разделов в диапазоне допустимых значений или ниже или выше их.
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
Но мы не можем позволить себе оплачивать rdd.getNumPartitions
за каждые DataFrame
таким образом.
Нет ли способа получить эту информацию - например, от запроса онлайн / временной catalog
таблицы registered
?
Обновление. Графический интерфейс Spark показал, что операция DataFrame.rdd занимает столько же времени, сколько и самый длинный sql в задании. Я перезапущу задание и немного прикреплю снимок экрана.
Следующее - это всего лишь тестовый пример: он использует небольшую часть размера данных по сравнению с производственными данными. Самый длинный sql
- всего пять минут - и этот уже скоро тратит это количество времени тоже (обратите внимание, что sql
здесь не помогли: он также должен выполняться впоследствии, что фактически удваивает совокупное время выполнения).
Мы можем видеть, что операция .rdd
в DataFrameUtils
строке 30 (показанной в фрагменте выше) занимает 5,1 минуты - и все же операция save
все еще заняла 5,2 минуты позже, т.е. мы не сэкономили время, выполнив .rdd
с точки зрения времени выполнения последующих save
.