Pyspark: определите столбец arrayType из структуры и вызовите udf для преобразования массива в строку

Я создаю ускоритель, который переносит данные из источника в пункт назначения. Например, я выберу данные из API и перенесу их в csv. У меня возникли проблемы с обработкой типа массива при преобразовании данных в csv. Я использовал методы withColumn и concat_ws (т. Е. df1 = df.withColumn ('movies', F.concat_ws (':', F.col (movies))) film - это тип массива столбец) для этого преобразования, и это сработало. Теперь я хотел, чтобы это происходило динамически. Я имею в виду, без указания имени столбца, есть ли способ выбрать имя столбца из структуры, которая имеет тип массива, а затем вызвать udf?

Спасибо за уделенное время!


person sam12    schedule 28.04.2021    source источник


Ответы (1)


Вы можете получить тип столбцов, используя df.schema. В зависимости от типа столбца вы можете применить concat_ws или нет:

data = [["test1", "test2", [1,2,3], ["a","b","c"]]]
schema= ["col1", "col2", "arr1", "arr2"]
df = spark.createDataFrame(data, schema)

array_cols = [F.concat_ws(":", c.name).alias(c.name) \
    for c in df.schema if isinstance(c.dataType, T.ArrayType) ]
other_cols = [F.col(c.name) \
    for c in df.schema if not isinstance(c.dataType, T.ArrayType) ]

df = df.select(other_cols + array_cols)

Результат:

+-----+-----+-----+-----+
| col1| col2| arr1| arr2|
+-----+-----+-----+-----+
|test1|test2|1:2:3|a:b:c|
+-----+-----+-----+-----+
person werner    schedule 29.04.2021