Как с помощью PySpark заполнить значения в столбце на основе группы/окна/раздела и выполнить UDF?

Я пытаюсь заполнить пропущенные значения в столбце. Столбец профиля либо в 1-й строке, либо в любой из следующих строк (которые расположены в зависимости от даты) в группе/разделе будет иметь значение, которое должно быть заполнено в ячейках ниже в столбце профиля.

Я попытался запустить его с оконной функцией, но не смог применить UDF к оконной функции.

valuesA = [('1',"", "20190108"),('1',"", "20190107"),('1',"abcd", "20190106"),('1',"", "20190105"),('1',"", "20190104"),('2',"wxyz", "20190103"),('2',"", "20190102"),('2',"", "20190101")]
TableA = spark.createDataFrame(valuesA,['vid','profile', 'date'])

valuesB = [('1',"null", "20190108"),('1',"null", "20190107"),('1',"abcd", "20190106"),('1',"abcd", "20190105"),('1',"abcd", "20190104"),('2',"wxyz", "20190103"),('2', "wxyz", "20190102"),('2', "wxyz", "20190101")]
TableB = spark.createDataFrame(valuesB,['vid','profile', 'date'])

TableA.show()
TableB.show()
Table A: This is what I have. 
+---+-------+--------+
|vid|profile|    date|
+---+-------+--------+
|  1|       |20190108|
|  1|       |20190107|
|  1|   abcd|20190106|
|  1|       |20190105|
|  1|       |20190104|
|  2|   wxyz|20190103|
|  2|       |20190102|
|  2|       |20190101|
+---+-------+--------+

Table B: What I am expecting. 
+---+-------+--------+
|vid|profile|    date|
+---+-------+--------+
|  1|   null|20190108|
|  1|   null|20190107|
|  1|   abcd|20190106|
|  1|   abcd|20190105|
|  1|   abcd|20190104|
|  2|   wxyz|20190103|
|  2|   wxyz|20190102|
|  2|   wxyz|20190101|
+---+-------+--------+


person user422930    schedule 07.06.2019    source источник


Ответы (1)


Вы можете использовать last оконную функцию. Примечание. Сначала withColumn заменяет все пустые строки нулями - функция last по умолчанию пропускает нули, что в данном случае и требуется.

from pyspark.sql.window import Window
from pyspark.sql.functions import *
TableB = TableA.withColumn('profile', when(length('profile') == 0, lit(None)).otherwise(col('profile')))\
    .withColumn("profile", last(col('profile'), True).over(Window.partitionBy('vid').orderBy(col('date').desc())))

TableB.show()

Выход:

+---+-------+--------+
|vid|profile|    date|
+---+-------+--------+
|  1|   null|20190108|
|  1|   null|20190107|
|  1|   abcd|20190106|
|  1|   abcd|20190105|
|  1|   abcd|20190104|
|  2|   wxyz|20190103|
|  2|   wxyz|20190102|
|  2|   wxyz|20190101|
+---+-------+--------+
person mrjoseph    schedule 07.06.2019
comment
Большое спасибо, это сработало хорошо. Аргумент ignorenulls в методе last был приятным штрихом. Вот чего мне не хватало. - person user422930; 07.06.2019