pyspark Измените значение столбца перед использованием groupby для этого столбца

У меня есть эти данные json, я хочу агрегировать по столбцу «отметка времени» ежечасно, суммируя данные в столбцах «b» и «a».

{"a":1 , "b":1, "timestamp":"2017-01-26T01:14:55.719214Z"}
{"a":1 , "b":1,"timestamp":"2017-01-26T01:14:55.719214Z"}
{"a":1 , "b":1,"timestamp":"2017-01-26T02:14:55.719214Z"}
{"a":1 , "b":1,"timestamp":"2017-01-26T03:14:55.719214Z"}

Это окончательный результат, который я хочу

{"a":2 , "b":2, "timestamp":"2017-01-26T01:00:00"}
{"a":1 , "b":1,"timestamp":"2017-01-26T02:00:00"}
{"a":1 , "b":1,"timestamp":"2017-01-26T03:00:00"}

Это то, что я написал до сих пор

df = spark.read.json(inputfile)
df2 = df.groupby("timestamp").agg(f.sum(df["a"],f.sum(df["b"])

Но как мне изменить значение столбца «timestamp» перед использованием функции groupby? Заранее спасибо!


person gashu    schedule 27.02.2017    source источник
comment
Этот ответ может оказаться полезным. Он показывает, как округлить проанализированный объект временной метки.   -  person santon    schedule 28.02.2017


Ответы (2)


from pyspark.sql import functions as f   

df = spark.read.load(path='file:///home/zht/PycharmProjects/test/disk_file', format='json')
df = df.withColumn('ts', f.to_utc_timestamp(df['timestamp'], 'EST'))
win = f.window(df['ts'], windowDuration='1 hour')
df = df.groupBy(win).agg(f.sum(df['a']).alias('sumA'), f.sum(df['b']).alias('sumB'))
res = df.select(df['window']['start'].alias('start_time'), df['sumA'], df['sumB'])
res.show(truncate=False)

# output:
+---------------------+----+----+                                               
|start_time           |sumA|sumB|
+---------------------+----+----+
|2017-01-26 15:00:00.0|1   |1   |
|2017-01-26 16:00:00.0|1   |1   |
|2017-01-26 14:00:00.0|2   |2   |
+---------------------+----+----+

f.window гораздо более гибкий

person Zhang Tong    schedule 28.02.2017
comment
Спасибо за ответ. На самом деле мне нужно только «2017-01-26 15:00:00.0» в столбце меток времени вместо «[2017-01-26 15:00:00.0,2017-01-26 16:00:00.0] '. Вы знаете, как я могу получить это? - person gashu; 28.02.2017

Я думаю, это единственный способ сделать это

df2 = df.withColumn("r_timestamp",df["r_timestamp"].substr(0,12)).groupby("timestamp").agg(f.sum(df["a"],f.sum(df["b"])

Есть ли лучшее решение для получения метки времени в требуемом формате?

person gashu    schedule 27.02.2017