От разреженного по столбцу до плотного массива в pyspark

У меня есть два фрейма данных, из которых мне нужно получить информацию, чтобы сгенерировать третий. Первый фрейм данных содержит информацию об итерациях элемента пользователем, например,

+-----+-----------+-----------+
|user | itemId    |date       |
+-----+-----------+-----------+
|1    | 10005880  |2019-07-23 |
|2    | 10005903  |2019-07-23 |
|3    | 10005903  |2019-07-23 |
|1    | 12458442  |2019-07-23 |
|1    | 10005903  |2019-07-26 |
|3    | 12632813  |2019-07-26 |
|2    | 12632813  |2019-07-26 |
+-----+-----------+-----------+

У него нет определенного порядка, и у каждого пользователя есть несколько строк. Второй фрейм данных - это просто список элементов с индексом, например,

+-----------+-----------+
| itemId    |index      |
+-----------+-----------+
| 10005880  |1          |
| 10005903  |2          |
| 12458442  |3          |
|    ...    |   ...     |
| 12632813  |2000000    |
+-----------+-----------+

Этот фрейм данных довольно длинный, и не каждый элемент представлен во фрейме данных взаимодействия элементов. Что необходимо, так это третий фрейм данных, где каждая строка содержит векторизованное представление взаимодействий элементов пользователя в виде массива в пределах одного столбца, например,

+-----+--------------------+
|user |  interactions      |
+-----+--------------------+
|1    |  <1, 1, 1, ..., 0> |                        
|2    |  <0, 1, 0, ..., 1> |                         
|3    |  <0, 1, 0, ..., 1> |                            
+-----+--------------------+

Где массив имеет 1, если пользователь взаимодействовал с элементом по этому индексу, иначе 0. Есть ли простой способ сделать это в pyspark?


person Kyle.    schedule 08.01.2020    source источник
comment
Что должен содержать interactions вектор? Список индексов из второго DataFrame? В вашем примере не очень понятно   -  person blackbishop    schedule 09.01.2020
comment
Извините, да, это правильно. 1, если пользователь взаимодействовал с элементом по этому индексу, в противном случае - 0.   -  person Kyle.    schedule 09.01.2020
comment
Обновлен вопрос, чтобы отразить это, @blackbishop!   -  person Kyle.    schedule 09.01.2020
comment
@ Кайл. сколько элементов вы ожидаете в каждом массиве столбца interaction, 2 000 000+ элементов?   -  person jxc    schedule 10.01.2020
comment
Хороший вопрос, @jxc! Не так много, но в идеале решение должно масштабироваться до нескольких сотен тысяч.   -  person Kyle.    schedule 10.01.2020


Ответы (3)


IIUC, вы можете использовать pyspark.ml.feature. CountVectorizer, чтобы помочь создать желаемый вектор. Предположим, что df1 - это первый фрейм данных (пользователь, itemId и дата), а df2 - второй фрейм данных (itemId и индекс):

from pyspark.ml.feature import CountVectorizerModel
from pyspark.sql.functions import collect_set

df3 = df1.groupby('user').agg(collect_set('itemId').alias('items_arr'))

# set up the vocabulary from the 2nd dataframe and then create CountVectorizerModel from this list
# set binary=True so that this is doing the same as OneHotEncoder
voc = [ r.itemId for r in df2.select('itemId').sort('index').collect() ]
model = CountVectorizerModel.from_vocabulary(voc, inputCol='items_arr', outputCol='items_vec', binary=True)

df_new = model.transform(df3)
df_new.show(truncate=False)
+----+------------------------------+-------------------------+
|user|items_arr                     |items_vec                |
+----+------------------------------+-------------------------+
|3   |[10005903, 12632813]          |(4,[1,2],[1.0,1.0])      |
|1   |[10005903, 12458442, 10005880]|(4,[0,1,3],[1.0,1.0,1.0])|
|2   |[10005903, 12632813]          |(4,[1,2],[1.0,1.0])      |
+----+------------------------------+-------------------------+

Это создает SparseVector, если вам нужен столбец ArrayType, вам понадобится udf:

from pyspark.sql.functions import udf
udf_to_array = udf(lambda v: [*map(int, v.toArray())], 'array<int>')

df_new.withColumn('interactions', udf_to_array('items_vec')).show(truncate=False)
+----+------------------------------+-------------------------+------------+
|user|items_arr                     |items_vec                |interactions|
+----+------------------------------+-------------------------+------------+
|3   |[10005903, 12632813]          |(4,[1,2],[1.0,1.0])      |[0, 1, 1, 0]|
|1   |[10005903, 12458442, 10005880]|(4,[0,1,3],[1.0,1.0,1.0])|[1, 1, 0, 1]|
|2   |[10005903, 12632813]          |(4,[1,2],[1.0,1.0])      |[0, 1, 1, 0]|
+----+------------------------------+-------------------------+------------+
person jxc    schedule 10.01.2020

Попробуй это! При необходимости вы также можете изменить или внести какие-либо исправления.

from pyspark.sql.functions import col, when, arrays_zip

userIndexes = users.join(items, users.itemId == items.itemId, 'left').crosstab('user', 'index')

cols = userIndexes.columns.filter(_ != 'user')

userIndexes.select('user', arrays_zip([when(col(c).isNull(), lit(0)).otherwise(lit(1)) for c in cols]).alias('interactions')).show()

Наслаждайтесь и ура!

Обновление: Установить конфигурацию Spark:

var sparkConf: SparkConf = null
sparkConf = new SparkConf()
.set("spark.sql.inMemoryColumnarStorage.batchSize", 36000)

Настройка производительности

person OO7    schedule 10.01.2020
comment
Я получаю сообщение об ошибке, что количество различных значений индекса не может превышать 10000, но у меня их около 36000! Может быть, есть способ обойти это ... пока голосование за, но я приму ваше решение, если смогу его проверить! - person Kyle.; 10.01.2020
comment
Возможно, это вызвано параметром spark.sql.inMemoryColumnarStorage.batchSize в конфигурации Spark. Итак, я обновил свой ответ, рассказав, как это настроить. - person OO7; 11.01.2020

Вы можете присоединиться к 2 DataFrames, а затем собрать список групп индексов по user.

df_users_items = df_users.join(df_items, ["itemId"], "left")

df_user_interations = df_users_items.groupBy("user").agg(collect_set("index").alias("interactions"))

Теперь с помощью массива индексов создайте новый массив interactions, например:

max_index = df_items.select(max(col("index")).alias("max_index")).first().max_index
interactions_col = array(
    *[when(array_contains("interactions", i + 1), lit(1)).otherwise(lit(0)) for i in range(max_index)])

df_user_interations.withColumn("interactions", interactions_col).show()
person blackbishop    schedule 09.01.2020
comment
Мне сложно заставить это работать так, как написано, но это может быть хорошей отправной точкой ... Я посмотрю, смогу ли я выяснить, что не работает, и дам вам знать, чтобы вы могли обновить это! - person Kyle.; 09.01.2020
comment
Я получаю сообщение об ошибке max (col (index)), поскольку столбцы не могут быть повторены ... это функция col (), которую вы используете из пакета pyspark.sql.functions? - person Kyle.; 09.01.2020