pyspark collect_set или collect_list с помощью groupby

Как я могу использовать collect_set или collect_list в фрейме данных после groupby. например: df.groupby('key').collect_set('values'). Я получаю сообщение об ошибке: AttributeError: 'GroupedData' object has no attribute 'collect_set'


person Hanan Shteingart    schedule 02.06.2016    source источник
comment
Можете ли вы опубликовать образцы данных, которые вызовут эту ошибку, чтобы мы могли отладить вашу проблему?   -  person Katya Handler    schedule 02.06.2016


Ответы (2)


Вам нужно использовать агг. Пример:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

df.show()

+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

Обратите внимание, что в приведенном выше примере вам необходимо создать HiveContext. См. https://stackoverflow.com/a/35529093/690430 для работы с различными версиями Spark.

(df
  .groupby("id")
  .agg(F.collect_set("code"),
       F.collect_list("name"))
  .show())

+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+
person Kamil Sindi    schedule 27.06.2016
comment
collect_set () содержит отдельные элементы, а collect_list () содержит все элементы (кроме нулей) - person Grant Shannon; 03.05.2018
comment
Функция size в collect_set или collect_list будет лучше для вычисления значения счетчика или использования простой функции счета. Я использую окно, чтобы получить количество транзакций, связанных с учетной записью. - person user3858193; 06.05.2018
comment
Как получить вывод collect_list как dict, когда у меня есть несколько столбцов внутри списка, например: agg (collect_list (struct (df.f1, df.f2, df.f3))). Вывод должен быть [f1: значение, f2: значение, f3: значение] для каждой группы. - person Immanuel Fredrick; 12.03.2019

Если ваш фрейм данных большой, вы можете попробовать использовать pandas udf (GROUPED_AGG), чтобы избежать ошибки памяти. К тому же это намного быстрее.

Сгруппированные агрегированные пользовательские функции Pandas аналогичны агрегатным функциям Spark. Сгруппированные агрегированные пользовательские функции Pandas используются с groupBy (). Agg () и pyspark.sql.Window. Он определяет агрегацию от одной или нескольких pandas.Series до скалярного значения, где каждая pandas.Series представляет столбец в группе или окне. панды udf

пример:

import pyspark.sql.functions as F

@F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG)
def collect_list(name):
    return ', '.join(name)

grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))
person Allen    schedule 01.10.2019
comment
Я не думаю, что пользовательский UDF быстрее встроенной искры - person jwdink; 18.10.2019
comment
Я знаю, что UDF pandas намного медленнее, чем встроенный искр (а также, что UDF pandas требует больше памяти от вашего кластера)! Что быстрее, чистый java / scala или java, который должен вызывать python в структуре данных, которая также должна быть сериализована с помощью стрелки в pandas DF? - person Marco; 09.05.2020