Заполнение Spark Dataframe

Я хотел бы выполнить операцию типа заполнения в кадре данных, чтобы удалить нули и убедиться, что последняя строка является своего рода сводной строкой, содержащей последние известные значения для каждого столбца на основе timestamp, сгруппированные по itemId. Поскольку я использую записные книжки Azure Synapse, языком может быть Scala, Pyspark, SparkSQL или даже C#. Однако проблема здесь в том, что реальное решение имеет до миллионов строк и сотен столбцов, поэтому мне нужно динамическое решение, которое может использовать преимущества Spark. Мы можем предоставить большой кластер, чтобы убедиться, что мы используем его в полной мере?

Пример данных:

// Assign sample data to dataframe
val df = Seq(
    ( 1, "10/01/2021", 1, "abc", null ),
    ( 2, "11/01/2021", 1, null, "bbb" ),
    ( 3, "12/01/2021", 1, "ccc", null ),
    ( 4, "13/01/2021", 1, null, "ddd" ),

    ( 5, "10/01/2021", 2, "eee", "fff" ),
    ( 6, "11/01/2021", 2, null, null ),
    ( 7, "12/01/2021", 2, null, null )
    ).
    toDF("eventId", "timestamp", "itemId", "attrib1", "attrib2")

df.show

Ожидаемые результаты со строками 4 и 7 в качестве сводных строк:

+-------+----------+------+-------+-------+
|eventId| timestamp|itemId|attrib1|attrib2|
+-------+----------+------+-------+-------+
|      1|10/01/2021|     1|    abc|   null|
|      2|11/01/2021|     1|    abc|    bbb|
|      3|12/01/2021|     1|    ccc|    bbb|
|      4|13/01/2021|     1|    ccc|    ddd|
|      5|10/01/2021|     2|    eee|    fff|
|      6|11/01/2021|     2|    eee|    fff|
|      7|12/01/2021|     2|    eee|    fff|
+-------+----------+------+-------+-------+

Я рассмотрел этот вариант, но у меня возникли проблемы с его адаптацией для моего варианта использования.

Spark/Scala: прямое заполнение последним наблюдением

У меня есть работающее решение SparkSQL, но оно будет очень подробным для большого объема столбцов, надеясь на что-то более простое в обслуживании:

%%sql
WITH cte (
SELECT
    eventId,
    itemId,
    ROW_NUMBER() OVER( PARTITION BY itemId ORDER BY timestamp ) AS rn,
    attrib1,
    attrib2
FROM df
)
SELECT
    eventId,
    itemId,
    CASE rn WHEN 1 THEN attrib1 
        ELSE COALESCE( attrib1, LAST_VALUE(attrib1, true) OVER( PARTITION BY itemId ) ) 
    END AS attrib1_xlast,
    CASE rn WHEN 1 THEN attrib2 
        ELSE COALESCE( attrib2, LAST_VALUE(attrib2, true) OVER( PARTITION BY itemId ) ) 
    END AS attrib2_xlast
    
FROM cte
ORDER BY eventId

person wBob    schedule 12.04.2021    source источник


Ответы (1)


Для многих columns вы можете создать expression, как показано ниже.

val window = Window.partitionBy($"itemId").orderBy($"timestamp")

// Instead of selecting columns you could create a list of columns 
val expr = df.columns
  .map(c => coalesce(col(c), last(col(c), true).over(window)).as(c))

df.select(expr: _*).show(false)

Обновлять:

val mainColumns = df.columns.filterNot(_.startsWith("attrib"))
val aggColumns = df.columns.diff(mainColumns).map(c => coalesce(col(c), last(col(c), true).over(window)).as(c))

df.select(( mainColumns.map(col) ++ aggColumns): _*).show(false)

Результат:

+-------+----------+------+-------+-------+
|eventId|timestamp |itemId|attrib1|attrib2|
+-------+----------+------+-------+-------+
|1      |10/01/2021|1     |abc    |null   |
|2      |11/01/2021|1     |abc    |bbb    |
|3      |12/01/2021|1     |ccc    |bbb    |
|4      |13/01/2021|1     |ccc    |ddd    |
|5      |10/01/2021|2     |eee    |fff    |
|6      |11/01/2021|2     |eee    |fff    |
|7      |12/01/2021|2     |eee    |fff    |
+-------+----------+------+-------+-------+
person koiralo    schedule 13.04.2021
comment
Есть ли способ исключить определенные столбцы? - person wBob; 13.04.2021
comment
Вы можете drop столбцы как df.columns.drop(0) с индексом или можете использовать filterNot df.columns.filterNot(_.endsWith("eventId")) - person koiralo; 14.04.2021
comment
Ах, хорошо, я имею в виду, что хочу запустить заполнение только для определенных столбцов и создать результирующий фрейм данных, в котором некоторые столбцы не затронуты, а к некоторым применено заполнение, имеет смысл? Если вы посмотрите на мой пример SQL, вы увидите, что eventId, timestamp и itemId остались нетронутыми, а изменены только attrib1 и 2. Что-то вроде «заполнять только столбцы [attrib1, attrib2]» или «заполнять только столбцы, начинающиеся с атрибута». Есть смысл? - person wBob; 14.04.2021
comment
Пожалуйста, проверьте обновление. Вы также можете определить собственный список полей вместо подготовки таким образом. - person koiralo; 14.04.2021
comment
Спасибо за обновления. Не бойтесь голосовать за вопрос, если он показался вам интересным! - person wBob; 15.04.2021