Я хотел бы выполнить операцию типа заполнения в кадре данных, чтобы удалить нули и убедиться, что последняя строка является своего рода сводной строкой, содержащей последние известные значения для каждого столбца на основе timestamp
, сгруппированные по itemId
. Поскольку я использую записные книжки Azure Synapse, языком может быть Scala, Pyspark, SparkSQL или даже C#. Однако проблема здесь в том, что реальное решение имеет до миллионов строк и сотен столбцов, поэтому мне нужно динамическое решение, которое может использовать преимущества Spark. Мы можем предоставить большой кластер, чтобы убедиться, что мы используем его в полной мере?
Пример данных:
// Assign sample data to dataframe
val df = Seq(
( 1, "10/01/2021", 1, "abc", null ),
( 2, "11/01/2021", 1, null, "bbb" ),
( 3, "12/01/2021", 1, "ccc", null ),
( 4, "13/01/2021", 1, null, "ddd" ),
( 5, "10/01/2021", 2, "eee", "fff" ),
( 6, "11/01/2021", 2, null, null ),
( 7, "12/01/2021", 2, null, null )
).
toDF("eventId", "timestamp", "itemId", "attrib1", "attrib2")
df.show
Ожидаемые результаты со строками 4 и 7 в качестве сводных строк:
+-------+----------+------+-------+-------+
|eventId| timestamp|itemId|attrib1|attrib2|
+-------+----------+------+-------+-------+
| 1|10/01/2021| 1| abc| null|
| 2|11/01/2021| 1| abc| bbb|
| 3|12/01/2021| 1| ccc| bbb|
| 4|13/01/2021| 1| ccc| ddd|
| 5|10/01/2021| 2| eee| fff|
| 6|11/01/2021| 2| eee| fff|
| 7|12/01/2021| 2| eee| fff|
+-------+----------+------+-------+-------+
Я рассмотрел этот вариант, но у меня возникли проблемы с его адаптацией для моего варианта использования.
Spark/Scala: прямое заполнение последним наблюдением
У меня есть работающее решение SparkSQL, но оно будет очень подробным для большого объема столбцов, надеясь на что-то более простое в обслуживании:
%%sql
WITH cte (
SELECT
eventId,
itemId,
ROW_NUMBER() OVER( PARTITION BY itemId ORDER BY timestamp ) AS rn,
attrib1,
attrib2
FROM df
)
SELECT
eventId,
itemId,
CASE rn WHEN 1 THEN attrib1
ELSE COALESCE( attrib1, LAST_VALUE(attrib1, true) OVER( PARTITION BY itemId ) )
END AS attrib1_xlast,
CASE rn WHEN 1 THEN attrib2
ELSE COALESCE( attrib2, LAST_VALUE(attrib2, true) OVER( PARTITION BY itemId ) )
END AS attrib2_xlast
FROM cte
ORDER BY eventId