Как распаковать набор данных (используя свод)?

Я попробовал новую функцию «поворота» версии 1.6 на больший набор данных. Он имеет 5 656 458 строк, а столбец IndicatorCode содержит 1344 различных кода.

Идея заключалась в том, чтобы использовать свод для «распаковки» (в терминах pandas) этого набора данных и иметь столбец для каждого IndicatorCode.

schema = StructType([ \
   StructField("CountryName", StringType(), True), \
   StructField("CountryCode", StringType(), True), \
   StructField("IndicatorName", StringType(), True), \
   StructField("IndicatorCode", StringType(), True), \
   StructField("Year", IntegerType(), True), \
   StructField("Value", DoubleType(), True)  \
])

data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv', 
                            format='com.databricks.spark.csv', 
                            header='true', 
                            schema=schema)

data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
                      .select(["CountryCode", "IndicatorCode2", "Year", "Value"])

columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]

data3 = data2.groupBy(["Year", "CountryCode"])\
             .pivot("IndicatorCode2", columns)\
             .max("Value")

Хотя это успешно вернулось, data3.first() так и не вернул результат (я прервал работу на своем автономном компьютере, используя 3 ядра, через 10 минут).

Мой подход с использованием RDD и aggregateByKey работал хорошо, поэтому я не ищу решения о том, как это сделать, но могу ли поворот с DataFrames также помочь.


person Bernhard    schedule 16.02.2016    source источник


Ответы (2)


Что ж, поворот — не очень эффективная операция в целом, и вы мало что можете с этим поделать, используя DataFrame API. Однако вы можете попробовать repartition свои данные:

(data2
  .repartition("Year", "CountryCode")
  .groupBy("Year", "CountryCode")
  .pivot("IndicatorCode2", columns)
  .max("Value"))

или даже совокупность:

from pyspark.sql.functions import max

(df
    .groupBy("Year", "CountryCode", "IndicatorCode")
    .agg(max("Value").alias("Value"))
    .groupBy("Year", "CountryCode")
    .pivot("IndicatorCode", columns)
    .max("Value"))

перед применением pivot. Идея обоих решений одинакова. Вместо перемещения больших расширенных Rows перемещайте узкие плотные данные и расширяйте локально.

person zero323    schedule 16.02.2016
comment
Спасибо. Мне интересно, что что-то можно сделать на одном узле с пандами за 26 секунд (16 секунд загрузка/анализ csv и 10 секунд для распаковки) - person Bernhard; 17.02.2016
comment
... так сложно использовать искру с четырьмя из этих машин. С RDD у меня сейчас около 100 секунд - я постараюсь найти более быстрый способ в Spark. - person Bernhard; 17.02.2016
comment
Меня это не удивляет. Локальная обработка выполняется быстро, потому что она просто перемещает данные в памяти. Spark должен выполнять много работы, даже локально, и делать определенные предположения о данных, которые могут сделать этот тип операций неэффективным. - person zero323; 17.02.2016

В Spark 2.0 представлена ​​SPARK-13749 реализация поворота, которая быстрее для больших количество значений сводного столбца.

При тестировании с помощью Spark 2.1.0 на моем компьютере ваш пример теперь выполняется за 48 секунд.

person Andrew Ray    schedule 10.03.2017