Добавление элемента списка в качестве столбца в существующий фрейм данных pyspark

У меня есть список lists=[0,1,2,3,5,6,7]. Порядок не является последовательным. У меня есть фреймворк pyspark с 9 столбцами.

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|    NaN|           NaN|    1| NaN |

Мне нужно добавить свои списки в виде столбца в существующий фрейм данных. Мои списки не в порядке, поэтому я не могу использовать udf. Есть ли способ сделать это? Пожалуйста, помогите мне, я хочу, чтобы это было так

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+------+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|lists |
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+-------+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|0     |
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|           NaN|    1| NaN |1     |

person user-2147482338    schedule 01.10.2019    source источник
comment
не могли бы вы показать нам, как бы вы хотели добавить этот список в существующий фрейм данных?   -  person vikrant rana    schedule 01.10.2019
comment
Я добавил, как должен выглядеть фрейм данных. Я добавил только две строки, но основная проблема заключается в том, чтобы добавить столбец в мой фрейм данных, и у меня есть списки.   -  person user-2147482338    schedule 01.10.2019
comment
Не повторять, а просто добавил мои списки значений в качестве нового столбца в мой существующий фрейм данных.   -  person user-2147482338    schedule 01.10.2019
comment
Возможный дубликат Как мне добавить новый столбец в фрейм данных Spark (используя PySpark)?   -  person Daniel    schedule 01.10.2019
comment
@ Дэниел, это другой вопрос. У меня есть список, и я хочу добавить его в качестве столбца в свой фреймворк   -  person user-2147482338    schedule 01.10.2019
comment
Следует ли назначить первый элемент вашего списка строке с самой ранней датой (т. Е. Упорядочен ли фрейм данных по дате)?   -  person cronoik    schedule 01.10.2019
comment
@cronoik. Он упорядочен по дате. Предположим, у меня есть фрейм данных с Row = 7 и column = 9. У меня есть список, длина которого равна 7. Мне нужно добавить этот список в качестве нового столбца в мой фрейм данных. Таким образом, мой результирующий фрейм данных будет иметь 10 столбцов.   -  person user-2147482338    schedule 01.10.2019
comment
Я только что видел, что у вас в столбце индекса. Является ли он последовательным, начиная с 0, и могу ли я также сказать, что первый индекс вашего списка принадлежит первой строке с индексом 0? Я задаю этот вопрос, потому что фреймы данных pyspark не упорядочены (например, pandas), и для проведения такой операции требуется столбец, который позволяет вам упорядочить фрейм данных.   -  person cronoik    schedule 01.10.2019
comment
Будет ли работать что-то подобное ниже? list = [(1, 'DEF'), (2, 'KLM')] df = spark.createDataFrame (list, ['id', 'value']) lists = [5,6] rdd = sc.parallelize ( списки) df = df.rdd.zip (rdd) .map (лямбда x: (x [0] [0], x [0] [1], x [1])). toDF ([id, Value, index ])   -  person vikrant rana    schedule 01.10.2019
comment
вы можете создать rdd из данного списка и заархивировать его с существующим фреймом данных и использовать для него операцию карты. но указанные элементы списка и строки фрейма данных должны быть одинаковыми для вышеуказанного метода.   -  person vikrant rana    schedule 01.10.2019
comment
Спасибо, но вы можете объяснить x[0][0],x[0][1],x[1]. Это динамично ?. Решение должно работать для любого количества столбцов. длина списков равна строкам фрейма данных.   -  person user-2147482338    schedule 03.10.2019
comment
@ cronoik .. да только подряд. но элементы в списках не последовательные.   -  person user-2147482338    schedule 03.10.2019
comment
@ user-2147482338. если длина строк списка и фрейма данных равна, выше будет работать для любого количества столбцов. вам просто нужно включить каждый элемент столбца в функцию карты.   -  person vikrant rana    schedule 03.10.2019
comment
или можно использовать некоторое понимание списка, чтобы исключить кодирование его отдельно для каждого элемента.   -  person vikrant rana    schedule 03.10.2019
comment
@vikrantrana большое спасибо. так x[0][0],x[0][1],x[1] будет работать для любого количества столбцов, верно?   -  person user-2147482338    schedule 03.10.2019
comment
Нет, это было специально для моего фрейма данных, который имеет два столбца. вам может потребоваться включить свои столбцы в качестве элемента или можно использовать некоторые понимания списков. Я попробую через некоторое время.   -  person vikrant rana    schedule 03.10.2019


Ответы (2)


Не слишком уверен, должно ли это быть что-то вроде этого или вы ожидали чего-то еще. Если у вас должно быть одинаковое количество элементов списка и строк фрейма данных, то вот простой подход.

Для данного образца кадра данных с тремя столбцами:

 l = [(1,'DEF',33),(2,'KLM',22),(3,'ABC',32),(4,'XYZ',77)]
 df=spark.createDataFrame(l, ['id', 'value','age'])

Допустим, вот список:

lists=[5,6,7,8]

Можно создать rdd из этого списка и использовать функцию zip с фреймом данных и использовать функцию карты поверх него.

listrdd = sc.parallelize(lists)

newdf=df.rdd.zip(listrdd).map(lambda (x,y ) : ([x for x in x] + [y])).toDF(["id", "Value",",age","List_element"])

>>> ziprdd=df.rdd.zip(listrdd)
>>> ziprdd.take(50)
[(Row(id=1, value=u'DEF', age=33), 5), (Row(id=2, value=u'KLM', age=22), 6), (Row(id=3, value=u'ABC', age=32), 7), (Row(id=4, value=u'XYZ', age=77), 8)]

Поскольку функция zip возвращает пары значений ключа, первый элемент которых содержит данные из первого rdd, а второй элемент содержит данные из второго rdd. Я использую понимание списка для первого элемента и объединяю его со вторым элементом.

Он динамический и может работать для n столбцов, но элементы списка и строки фрейма данных должны быть одинаковыми.

>>> newdf.show()
]+---+-----+----+------------+
| id|Value|,age|List_element|
+---+-----+----+------------+
|  1|  DEF|  33|           5|
|  2|  KLM|  22|           6|
|  3|  ABC|  32|           7|
|  4|  XYZ|  77|           8|
+---+-----+----+------------+

Примечание. Количество разделов rdd должно быть одинаковым для использования метода zip, иначе вы получите сообщение об ошибке.

ValueError: Can only zip with RDD which has the same number of partitions
person vikrant rana    schedule 03.10.2019
comment
Большое спасибо, я этого ожидал. Я попробую это. Спасибо за помощь. - person user-2147482338; 04.10.2019
comment
Я получаю ValueError: Can only zip with RDD which has the same number of partitions. - person user-2147482338; 04.10.2019
comment
Я нахожу это _1 _ .. Есть идеи, как это решить? - person user-2147482338; 04.10.2019
comment
да. каково количество разделов для вашего списка и фрейма данных. Он должен быть таким же для функции zip. - person vikrant rana; 04.10.2019
comment
может быть, мы можем оставить раздел таким же, или нам нужно искать другой подход .. один из возможных способов использования zipwithindex и присоединения по индексным ключам .. Просто подумайте - person vikrant rana; 04.10.2019
comment
df имеет 2, а listrdd - 8 partitionNumber. - person user-2147482338; 04.10.2019
comment
Я переделал список на 2, но теперь получаю TypeError: <lambda>() missing 1 required positional argument: 'y'. Вы можете мне помочь с этим, пожалуйста - person user-2147482338; 07.10.2019
comment
Это не должно быть проблемой .. вы используете ту же карту? - person vikrant rana; 07.10.2019
comment
Я думаю, что распаковка кортежей удалена в лямбда-функции. - person user-2147482338; 11.10.2019

вы можете join два dfs, например:

df2 = spark.createDataFrame()
df= df.join(df2, on=['index']).drop('index')

df2 будет содержать столбцы, которые вы хотите добавить к основному df.

person Elad Cohen    schedule 02.10.2019