Мудрый рейтинг в PySpark

Мои данные искры выглядят так -

area           product           score          
a               aa                 .39
a               bb                 .03
a               cc                 1.1
a               dd                 .5
b               ee                 .02
b               aa                 1.2
b               mm                  .5
b               bb                 1.3

Я хочу, чтобы топ-3 продуктов были в рейтинге на основе переменной оценки. Мой окончательный результат должен быть

area           product           score          rank
a               cc                 1.1            1
a               dd                 .5             2 
a               a                  .39            3
b               bb                 1.3            1 
b               aa                 1.2            2
b               mm                  .5            3

Как это сделать в PySpark?

Я сделал до сих пор -

from pyspark.sql import Window
import pyspark.sql.functions as psf
wA = Window.orderBy(psf.desc("score"))
df = df.withColumn(
    "rank", 
    psf.dense_rank().over(wA))

Но у меня не работает.


person John Davis    schedule 27.11.2019    source источник
comment
добавьте partitionBy в: wA = Window.partitionBy('area').orderBy(psf.desc("score")) и убедитесь, что счет - это числовой столбец.   -  person jxc    schedule 27.11.2019


Ответы (1)


Разделение по area и фильтр rank<=3 даст результаты

import pyspark.sql.functions as psf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("Test").master("local[*]") \
    .getOrCreate()
df = spark.createDataFrame([('a', 'aa', .39),
                            ('a', 'bb', .03),
                            ('a', 'cc', 1.1),
                            ('a', 'dd', .5),
                            ('b', 'ee', .02),
                            ('b', 'aa', 1.2),
                            ('b', 'mm', .5),
                            ('b', 'bb', 1.3)],
                           ['area', 'product', 'score'])

wA = Window.partitionBy("area").orderBy(psf.desc("score"))
df = df.withColumn("rank",
                   psf.dense_rank().over(wA))
df.filter("rank<=3").show()
person nalanagula    schedule 27.11.2019