Добавить столбец во вновь созданный фрейм данных, выбрав столбец из старого фрейма данных pyspark

Я читаю JSON, и у меня есть словарь (dictn), ключи которого говорят мне, какие все столбцы я должен выбрать из JSON df.

Я пытаюсь создать новый df, а затем добавить те столбцы, чьи ключи из dictn присутствовали в JSON, но я получаю приведенную ниже ошибку: Любая помощь в этом очень ценится, поскольку я действительно новичок в искра.

'Атрибут(ы) разрешения ip#238 отсутствует в операторе !Project [ip#238 AS ip#267].;;\n!Project [ip#238 AS ip#267]\n+- LogicalRDD false\

from pyspark.sql.functions import lit
from pyspark.sql.types import StructType
import json
from pyspark.sql.functions import explode

jsn={"body":[{"ip":"177.284.10.91","sg_message_id":"YcbG1IBnQ1-626TaUVg2bQ.filter1049p1las1-18982-5C868E5A-20.0","hostname":"d2214390ce89","useragent":"Mozilla/5.0 (Linux; Android 7.1.2; E6810) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.105 Mobile Safari/537.36","method_name":"mass_gifting","email":"[email protected]","timestamp":1554076768,"url":"https://engagement.test.com/b/genghisgrill","object_id":42813,"category":["42813-3-11-19-bottomless-is-back","713","mass_gifting"],"sg_event_id":"Krfn-yDfTG-CQ-o8zhTb0w","event":"click","klass":"3-11-19-bottomless-is-back","url_offset":{"index":3,"type":"html"},"rails_env":"production","user_id":78003906,"business_id":713}],"account":"testaccount"}

dictn={'ip':'string',
       'sg_message_id':'string',
       'hostname':'string',
       'method_name':'string',
       'email':'string',
       'timestamp':'bigint',
       'smtp-id':'string',
       'object_id':'bigint',
       'response':'string',
       'sg_event_id':'string',
       'tls':'string',
       'event':'string',
       'klass':'string',
       'user_id':'string',
       'rails_env':'string',
       'business_id':'bigint'}
schema = StructType([])
new_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)
a=[json.dumps(jsn)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
x=df.select("body")
df1=df.withColumn("foo",explode("body")).select("foo.*")
for k1,v1 in dictn.items():
  if k1 in df1.columns:
      new_df=new_df.withColumn(k1,df1[k1])
  else:
      new_df=new_df.withColumn(k1,lit(10))
new_df.show()

person user7422128    schedule 29.12.2019    source источник


Ответы (1)


Вы получаете эту ошибку, потому что пытаетесь добавить новые столбцы, ссылаясь на столбцы из другого DataFrame, который на самом деле не поддерживается в Spark. Вопрос уже задан и на него дан ответ здесь: Добавить столбец из другого фрейма данных

Но для достижения того, что вы хотите здесь, вам просто нужно использовать select из df1, который дает вам новый DataFrame со списком столбцов, которые вы получаете из dict.

Это должно хорошо работать для вас:

select_expr = [col(c).alias(c) if c in df1.columns else lit(10).alias(c) for c, _ in dictn.items()]

new_df = df1.select(select_expr)

new_df.show()
person blackbishop    schedule 29.12.2019
comment
это дает мне: имя 'col' не определено - person user7422128; 29.12.2019
comment
добавить from pyspark.sql.functions import col - person blackbishop; 29.12.2019