Я попробовал новую функцию «поворота» версии 1.6 на больший набор данных. Он имеет 5 656 458 строк, а столбец IndicatorCode
содержит 1344 различных кода.
Идея заключалась в том, чтобы использовать свод для «распаковки» (в терминах pandas) этого набора данных и иметь столбец для каждого IndicatorCode.
schema = StructType([ \
StructField("CountryName", StringType(), True), \
StructField("CountryCode", StringType(), True), \
StructField("IndicatorName", StringType(), True), \
StructField("IndicatorCode", StringType(), True), \
StructField("Year", IntegerType(), True), \
StructField("Value", DoubleType(), True) \
])
data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv',
format='com.databricks.spark.csv',
header='true',
schema=schema)
data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
.select(["CountryCode", "IndicatorCode2", "Year", "Value"])
columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]
data3 = data2.groupBy(["Year", "CountryCode"])\
.pivot("IndicatorCode2", columns)\
.max("Value")
Хотя это успешно вернулось, data3.first()
так и не вернул результат (я прервал работу на своем автономном компьютере, используя 3 ядра, через 10 минут).
Мой подход с использованием RDD
и aggregateByKey
работал хорошо, поэтому я не ищу решения о том, как это сделать, но могу ли поворот с DataFrames также помочь.