У меня есть следующий фрейм данных PySpark (скажем, df
). В нем есть столбцы name
, timestamp
, category
и value
.
+------+-------------------+--------+-----+
| name| timestamp|category|value|
+------+-------------------+--------+-----+
| name1|2019-01-17 00:00:00| A|11.23|
| name2|2019-01-17 00:00:00| A|14.57|
| name3|2019-01-10 00:00:00| B| 2.21|
| name4|2019-01-10 00:00:00| B| 8.76|
| name5|2019-01-17 00:00:00| A|18.71|
| name6|2019-01-10 00:00:00| A|17.78|
| name7|2019-01-10 00:00:00| A| 5.52|
| name8|2019-01-10 00:00:00| A| 9.91|
| name9|2019-01-17 00:00:00| B| 1.16|
|name10|2019-01-17 00:00:00| B| 12.0|
+------+-------------------+--------+-----+
Я хочу добавить новый столбец в вышеупомянутый фрейм данных, который дает мне процентильное положение значений каждого имени в распределениях, которые включают элементы тех же category
и timestamp
.
Мой ожидаемый результат следующий:
+------+-------------------+--------+-----+---------+
|name |timestamp |category|value|pct_value|
+------+-------------------+--------+-----+---------+
|name1 |2019-01-17 00:00:00|A |11.23|1 |
|name10|2019-01-17 00:00:00|B |12.0 |2 |
|name2 |2019-01-17 00:00:00|A |14.57|2 |
|name3 |2019-01-10 00:00:00|B |2.21 |1 |
|name4 |2019-01-10 00:00:00|B |8.76 |2 |
+------+-------------------+--------+-----+---------+
only showing top 5 rows
Как лучше всего это сделать?
Я пробовал следующее:
import pyspark.sql.functions as F
from pyspark.sql import Window as W
w_cat = W.partitionBy('category', 'timestamp').orderBy("value")
df_new = ( df.select( '*', F.ntile(1000).over(w_cat).alias( 'pct_value' ) ) ).persist()
df_new.orderBy('name', 'timestamp').show(5,False)
Это дает правильный ожидаемый результат. Но этот метод занимает очень много времени (часы), когда я пробую его на своих фактических данных, которые содержат миллионы строк.
Вы можете сгенерировать фрейм данных, указанный выше (df
), используя приведенный ниже код:
Stats = Row("name", "timestamp", "category", "value")
stat1 = Stats('name1', "2019-01-17 00:00:00", "A", 11.23)
stat2 = Stats('name2', "2019-01-17 00:00:00", "A", 14.57)
stat3 = Stats('name3', "2019-01-10 00:00:00", "B", 2.21)
stat4 = Stats('name4', "2019-01-10 00:00:00", "B", 8.76)
stat5 = Stats('name5', "2019-01-17 00:00:00", "A", 18.71)
stat6 = Stats('name6', "2019-01-10 00:00:00", "A", 17.78)
stat7 = Stats('name7', "2019-01-10 00:00:00", "A", 5.52)
stat8 = Stats('name8', "2019-01-10 00:00:00", "A", 9.91)
stat9 = Stats('name9', "2019-01-17 00:00:00", "B", 1.16)
stat10 = Stats('name10', "2019-01-17 00:00:00", "B", 12.0)
stat_lst = [stat1 , stat2, stat3, stat4, stat5, stat6, stat7, stat8, stat9, stat10]
df = spark.createDataFrame(stat_lst)