Как создать новые столбцы на основе декартова произведения нескольких столбцов из фреймов данных pyspark

Позвольте мне на простом примере объяснить, что я пытаюсь сделать. допустим, у нас есть два очень простых фрейма данных, как показано ниже:

Df1
+---+---+---+
| a1| a2| a3|
+---+---+---+
|  2|  3|  7|
|  1|  9|  6|
+---+---+---+

Df2
+---+---+
| b1| b2|
+---+---+
| 10|  2|
|  9|  3|
+---+---+

Из df1, df2 нам нужно создать новый df со столбцами, которые являются декартовым произведением исходных столбцов из df1, df2. В частности, новый df будет иметь ‘a1b1’, ’a1b2’, ’a2b1’, ’a2b2’, ’a3b1’, ’a3b2’, а строки будут умножением соответствующих столбцов из df1, df2. Результат df должен выглядеть следующим образом:

Df3
+----+----+----+----+----+----+
|a1b1|a1b2|a2b1|a2b2|a3b1|a3b2|
+----+----+----+----+----+----+
|  20|   4|  30|   6|  70|  14|
|   9|   3|  81|  27|  54|  18|
+----+----+----+----+----+----+

Я искал искровые онлайн-документы, а также вопросы, размещенные здесь, но похоже, что они все о декартовом произведении строк, а не столбцов. Например, rdd.cartesian () предоставляет декартово произведение различных комбинаций значений в строке, как в следующем коде:

r = sc.parallelize([1, 2])
r.cartesian(r).toDF().show()

+---+---+
| _1| _2|
+---+---+
|  1|  1|
|  1|  2|
|  2|  1|
|  2|  2|
+---+---+

Но это не то, что мне нужно. Опять же, мне нужно создавать новые столбцы вместо строк. В моей проблеме количество строк останется таким же. Я понимаю, что udf может в конечном итоге решить проблему. Однако в моем реальном приложении у нас есть огромный набор данных, который занимает слишком много времени для создания всех столбцов (около 500 новых столбцов как все возможные комбинации столбцов). мы предпочитаем иметь некоторые виды векторных операций, которые могут повысить эффективность. Возможно, я ошибаюсь, но похоже, что spark udf основан на строковых операциях, что может быть причиной того, что на его завершение ушло так много времени.

Большое спасибо за любые предложения / отзывы / комментарии.

Для вашего удобства я прикрепил сюда простой код для создания примеров фреймов данных, показанных выше:

df1 = sqlContext.createDataFrame([[2,3,7],[1,9,6]],['a1','a2','a3'])
df1.show()

df2 = sqlContext.createDataFrame([[10,2],[9,3]],['b1','b2'])
df2.show()

person spectrum    schedule 17.02.2017    source источник
comment
Как связать строки? Порядок - это не то, на что в целом можно положиться.   -  person zero323    schedule 18.02.2017
comment
Привет Zero323, спасибо за ваше сообщение. У нас есть первичный ключ для связывания строк. Здесь давайте просто предположим, что строки совпадают по целочисленным индексам, и все фреймы данных имеют одинаковое количество строк.   -  person spectrum    schedule 18.02.2017
comment
Хорошо, так что совет от профессионала: наличие явного ключа - это хорошо. В зависимости от индексов нет :) В общем df1.join(df2, ['id']).select([df1[x] * df2[y] for x in df1.columns for y in df2.columns if x != 'id' and y != 'id']), когда id - это столбец связи.   -  person zero323    schedule 18.02.2017
comment
Привет, zero323, твой профессиональный код работает хорошо, чувак :) Однако имена столбцов сгенерированы не так, как я хочу. Хотя я легко могу их переименовать. большое спасибо!   -  person spectrum    schedule 21.02.2017


Ответы (1)


Насколько я знаю, это не так просто. Вот пример использования eval:

# function to add rownumbers in a dataframe
def addrownum(df):
    dff = df.rdd.zipWithIndex().toDF(['features','rownum'])
    odf = dff.map(lambda x : tuple(x.features)+tuple([x.rownum])).toDF(df.columns+['rownum'])
    return odf

df1_ = addrownum(df1)
df2_ = addrownum(df2)
# Join based on rownumbers
outputdf = df1_.rownum.join(df2_,df1_.rownum==df2_.rownum).drop(df1_.rownum).drop(df2_.rownum)

n1 = ['a1','a2','a3']  # columns in set1
n2 = ['b1','b2']       # columns in set2

# I create a string of expression that I want to execute
eval_list = ['x.'+l1+'*'+'x.'+l2 for l1 in n1 for l2 in n2]
eval_str = '('+','.join(eval_list)+')'
col_list = [l1+l2 for l1 in n1 for l2 in n2] 

dfcartesian = outputdf.map(lambda x:eval(eval_str)).toDF(col_list)

Еще кое-что, что может вам помочь, - это Elementwise Product в spark.ml.feature, но он будет не менее сложным. Вы берете элементы из одного списка, состоящего из нескольких элементов, в другой список и расширяете векторы функций обратно в фрейм данных.

person Gaurav Dhama    schedule 17.02.2017
comment
Привет, спасибо за ответ. Опять же, метод, который вы используете, - это строковая операция, которая очень медленная для огромного набора данных. Кроме того, поэлементное произведение в mllib не работает, потому что он использует отдельный весовой вектор для умножения ячейки массива в строке. - person spectrum; 22.02.2017