Как проверить количество разделов Spark DataFrame без затрат на .rdd

Существует ряд вопросов о том, как получить количество разделов n RDD и / или DataFrame: ответы неизменно следующие:

 rdd.getNumPartitions

or

 df.rdd.getNumPartitions

К сожалению, это дорогостоящая операция на DataFrame, потому что

 df.rdd

требуется преобразование из DataFrame в rdd. Это примерно столько же времени, сколько требуется для запуска.

 df.count

Я пишу логику, которая необязательно repartition или coalesce DataFrame - в зависимости от того, находится ли текущее количество разделов в диапазоне допустимых значений или ниже или выше их.

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
      } else inDf
    }.getOrElse(inDf))
    outDf
  }

Но мы не можем позволить себе оплачивать rdd.getNumPartitions за каждые DataFrame таким образом.

Нет ли способа получить эту информацию - например, от запроса онлайн / временной catalog таблицы registered?

Обновление. Графический интерфейс Spark показал, что операция DataFrame.rdd занимает столько же времени, сколько и самый длинный sql в задании. Я перезапущу задание и немного прикреплю снимок экрана.

Следующее - это всего лишь тестовый пример: он использует небольшую часть размера данных по сравнению с производственными данными. Самый длинный sql - всего пять минут - и этот уже скоро тратит это количество времени тоже (обратите внимание, что sql здесь не помогли: он также должен выполняться впоследствии, что фактически удваивает совокупное время выполнения).

введите описание изображения здесь

Мы можем видеть, что операция .rdd в DataFrameUtils строке 30 (показанной в фрагменте выше) занимает 5,1 минуты - и все же операция save все еще заняла 5,2 минуты позже, т.е. мы не сэкономили время, выполнив .rdd с точки зрения времени выполнения последующих save.


person WestCoastProjects    schedule 19.01.2019    source источник
comment
Здесь задан аналогичный вопрос stackoverflow.com/questions/54269477/   -  person GC001    schedule 10.04.2021


Ответы (2)


Нет внутренней стоимости rdd компонента в rdd.getNumPartitions, потому что возвращенный RDD никогда не оценивается.

Хотя вы можете легко определить это эмпирически, используя отладчик (я оставлю это в качестве упражнения для читателя) или установив, что никакие задания не запускаются в базовом сценарии

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

этого может быть недостаточно, чтобы убедить вас. Итак, давайте подойдем к этому более систематично:

  • rdd возвращает MapPartitionRDD (ds, как определено выше):

    scala> ds.rdd.getClass
    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
    
  • RDD.getNumPartitions RDD.partitions.

  • В сценарии без контрольной точки _ 12_ вызывает getPartitions (также не стесняйтесь отслеживать путь контрольной точки).
  • RDD.getPartitions .
  • Таким образом, фактическая реализация, используемая в этом случае, - MapPartitionsRDD.getPartitions, что просто делегирует вызов родителю.
  • Между rdd и источником есть только MapPartitionsRDD.

    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    

    Точно так же, если Dataset содержит обмен, мы будем следовать за родителями до ближайшего тасования:

    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    

    Обратите внимание, что этот случай особенно интересен, потому что мы фактически запустили задание:

    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    

    Это потому, что мы столкнулись со сценарием, когда разделы не могут быть определены статически (см. Количество разделов фрейма данных после сортировки? и Почему преобразование sortBy запускает задание Spark?).

    В таком сценарии getNumPartitions также вызовет задание:

    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    

    однако это не означает, что наблюдаемая стоимость каким-то образом связана с .rdd звонком. Вместо этого это внутренняя стоимость поиска partitions в случае, когда нет статической формулы (например, некоторые входные форматы Hadoop, где требуется полное сканирование данных).

Обратите внимание, что приведенные здесь замечания не следует экстраполировать на другие приложения Dataset.rdd. Например, ds.rdd.count было бы действительно дорого и расточительно.

person user10938362    schedule 19.01.2019
comment
Моя работа выполняется чуть больше половины времени после удаления .rdd.getNumPartitions calls - javadba - person WestCoastProjects; 13.03.2019
comment
@javadba Я считаю, что мы обсуждали это раньше - это никоим образом не отменяет этот ответ - я прямо заявил, что вычислительные разделы могут быть дорогими и при каких общих условиях, и что ошибка заключается в том, чтобы приписать потенциальную стоимость преобразованию во внешние типы . Кроме того, я предложил рассмотреть любой формальный контрпример (извините, размахивание рукой или анекдотические свидетельства не в счет), который вы предоставили, или полностью пересмотреть (или даже отозвать) мой ответ, если такой гипотетический контрпример докажет, что он в какой-то мере неверен или неполон. Мне действительно нечего здесь предложить. - person user10938362; 14.03.2019
comment
А, ладно, вторая часть ответа, возможно, будет полезна для перехода к (опекунам) вершине: поскольку первая часть выглядит как непризнание достоверности моих наблюдений. Я награжу это. - person WestCoastProjects; 14.03.2019

По моему опыту df.rdd.getNumPartitions работает очень быстро, я никогда не сталкивался с тем, что это занимает больше секунды или около того.

В качестве альтернативы вы также можете попробовать

val numPartitions: Long = df
      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()

что позволило бы избежать использования .rdd

person Raphael Roth    schedule 19.01.2019
comment
Попробую: в любом случае проголосую как интересный подход - person WestCoastProjects; 14.03.2019
comment
Повышает ли эта команда производительность по сравнению с df.rdd.getNumPartitions? - person Carlos AG; 01.05.2020
comment
@CarlosAG Я так не думаю - person Raphael Roth; 01.05.2020