У меня есть приложение ETL, использующее Spark 1.3.1, Amazon S3 и EMR 3.8. Мне нужно мое приложение для сохранения Dataframe в нескольких разделах.
Согласно документации Spark 1.3.1, это способ разбиения вашего файла паркета:
df1.save("data/test_table/key=1", "parquet")
Но проблема в том, что у моего RDD много данных и их нужно сохранять на отдельные разделы. (в группе)
Я не могу найти правильную логику, чтобы эта программа работала быстро (или не так медленно)
Некоторые попытки:
#Get All RDD Partition's
AllPartitions = RDD.map(lambda x: x[0]).distinct()
#For all partitions save filtering
for part in AllPartitions.collect():
filteredDF = df.filter(df.recordOpeningDate == part)
df.save(path=outputFilePath + "/FIELD=" + part, source='parquet',mode='append')
Попытка использовать GroupBy
def Mapping(line):
return (
line[0] , [
#Definition of my RDD
])
def SavePartitions(KV):
#WRONG ! I cant create a DF inside a transformation
df = sqlContext.createDataFrame(KV, SCHEMA)
df.save(...)
RDD = RDD.map(Mapping).groupByKey().mapValues(SavePartitions)