Расчет продолжительности путем вычитания двух столбцов datetime в строковом формате

У меня есть Spark Dataframe, состоящий из серии дат:

from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd

rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
                                    ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
                                    ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
                                    ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
                                    ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

Я хочу найти duration путем вычитания EndDateTime и StartDateTime. Я решил, что попробую сделать это с помощью функции:

# Function to calculate time delta
def time_delta(y,x): 
    end = pd.to_datetime(y)
    start = pd.to_datetime(x)
    delta = (end-start)
    return delta

# create new RDD and add new column 'Duration' by applying time_delta function
df2 = df.withColumn('Duration', time_delta(df.EndDateTime, df.StartDateTime)) 

Однако это дает мне:

>>> df2.show()
ID  EndDateTime          StartDateTime        ANI            Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... sip:4534454450 null    
X02 2014-02-13T12:35:... 2014-02-13T12:32:... sip:6413445440 null    
X03 2014-02-13T12:36:... 2014-02-13T12:32:... sip:4534437492 null    
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... sip:6474454453 null    
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... sip:8874458555 null  

Я не уверен, верен мой подход или нет. Если нет, я с радостью приму другой предложенный способ достижения этой цели.


person Jason    schedule 17.05.2015    source источник
comment
Вы пробовали отладку в REPL?   -  person dskrvk    schedule 18.05.2015
comment
@dskrvk У меня мало опыта отладки, так как я не разработчик. Однако я подозреваю, что проблема в том, как Spark передает данные функциям. Например, time_delta () работает на чистом Python. По какой-то причине некоторые функции Python / Pandas просто не работают. Например. import re def extract_ani (x): extract = x.str.extract (r '(\ d {10})') return extract Dates = Dates.withColumn ('Cell', extract_ani (Dates.ANI)) также выдает ошибки с Spark DataFrames, но работает, когда я конвертирую фрейм данных в RDD и использую функцию как часть sc.map   -  person Jason    schedule 19.05.2015
comment
В Scala я бы использовал TimestampType вместо StringType для хранения дат, а затем создал бы UDF для вычисления разницы между двумя столбцами. Я нигде не вижу, чтобы вы объявляли time_delta функцией, определяемой пользователем, но это необходимый шаг в Scala, чтобы заставить ее делать то, что вы пытаетесь сделать.   -  person David Griffin    schedule 19.05.2015
comment
Да, взгляните на Spark .apache.org / docs / latest / api / python / в pyspark.sql.functions.udf. Вам нужно создать time_delta как UDF   -  person David Griffin    schedule 19.05.2015
comment
@David Griffin, вы были правы :) Сначала я игнорировал регистрацию UDF, так как считал, что вам нужно регистрировать UDF только из-за того, что вы хотели использовать выражение select   -  person Jason    schedule 19.05.2015
comment
Между прочим, если бы вы использовали более строго типизированный язык, такой как Scala, вы бы получили жалобу на попытку передать столбцы в функцию, ожидающую аргументов String.   -  person David Griffin    schedule 19.05.2015
comment
Да, это была другая проблема ... Изначально я пытался использовать pd.to_datetime(), но это функция pandas, ожидающая столбцов.   -  person Jason    schedule 19.05.2015


Ответы (6)


Начиная с Spark 1.5, вы можете использовать unix_timestamp :

from pyspark.sql import functions as F
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('EndDateTime', format=timeFmt)
            - F.unix_timestamp('StartDateTime', format=timeFmt))
df = df.withColumn("Duration", timeDiff)

Обратите внимание на формат времени в стиле Java.

>>> df.show()
+---+--------------------+--------------------+--------+
| ID|         EndDateTime|       StartDateTime|Duration|
+---+--------------------+--------------------+--------+
|X01|2014-02-13T12:36:...|2014-02-13T12:31:...|     258|
|X02|2014-02-13T12:35:...|2014-02-13T12:32:...|     204|
|X03|2014-02-13T12:36:...|2014-02-13T12:32:...|     228|
|XO4|2014-02-13T12:37:...|2014-02-13T12:32:...|     269|
|XO5|2014-02-13T12:36:...|2014-02-13T12:33:...|     202|
+---+--------------------+--------------------+--------+
person Kamil Sindi    schedule 02.05.2016
comment
Вы можете разделить на 3600,0, чтобы преобразовать в часы df.withColumn("Duration_hours", df.Duration / 3600.0) - person Martin Tapp; 01.03.2018

Спасибо Дэвиду Гриффину. Вот как это сделать для использования в будущем.

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf

# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
                      ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
                      ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
                      ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
                      ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

# define timedelta function (obtain duration in seconds)
def time_delta(y,x): 
    from datetime import datetime
    end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
    start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
    delta = (end-start).total_seconds()
    return delta

# register as a UDF 
f = udf(time_delta, IntegerType())

# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime)) 

Применение time_delta() даст вам продолжительность в секундах:

>>> df2.show()
ID  EndDateTime          StartDateTime        Duration
X01 2014-02-13T12:36:... 2014-02-13T12:31:... 258     
X02 2014-02-13T12:35:... 2014-02-13T12:32:... 204     
X03 2014-02-13T12:36:... 2014-02-13T12:32:... 228     
XO4 2014-02-13T12:37:... 2014-02-13T12:32:... 268     
XO5 2014-02-13T12:36:... 2014-02-13T12:33:... 202 
person Jason    schedule 19.05.2015
comment
Пожалуйста, используйте (end-start) .total_seconds (). В противном случае вы получите неприятные сюрпризы вроде этого: time_delta ('2014-02-13T12: 36: 14.000', '2014-02-13T12: 36: 15.900') возвращает 86398 вместо -1.9 - person user2158166; 08.04.2016
comment
Этот код больше не работает. Продолжительность оказывается нулевой. Используя цеппелин, искру 1.6 - person Ravi; 30.07.2016

datediff(Column end, Column start)

Возвращает количество дней от начала до конца.

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html.

person j pavan kumar    schedule 15.08.2016

Это можно сделать в spark-sql, преобразовав дату строки в метку времени и затем получив разницу.

1: преобразовать в метку времени:

CAST(UNIX_TIMESTAMP(MY_COL_NAME,'dd-MMM-yy') as TIMESTAMP)

2: Получите разницу между датами с помощью функции datediff.

Это будет объединено во вложенную функцию, например:

spark.sql("select COL_1, COL_2, datediff( CAST( UNIX_TIMESTAMP( COL_1,'dd-MMM-yy') as TIMESTAMP), CAST( UNIX_TIMESTAMP( COL_2,'dd-MMM-yy') as TIMESTAMP) ) as LAG_in_days from MyTable")

Вот результат:

+---------+---------+-----------+
|    COL_1|    COL_2|LAG_in_days|
+---------+---------+-----------+
|24-JAN-17|16-JAN-17|          8|
|19-JAN-05|18-JAN-05|          1|
|23-MAY-06|23-MAY-06|          0|
|18-AUG-06|17-AUG-06|          1|
+---------+---------+-----------+

Ссылка: https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL

person Ayush Vatsyayan    schedule 09.01.2018

Используйте DoubleType вместо IntegerType

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf


# Build sample data
rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
                      ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
                      ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
                      ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
                      ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

# define timedelta function (obtain duration in seconds)
def time_delta(y,x): 
    from datetime import datetime
    end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')
    start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')
    delta = (end-start).total_seconds()
    return delta

# register as a UDF 
f = udf(time_delta, DoubleType())

# Apply function
df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime))
person Prince Francis    schedule 04.10.2018

Вот рабочая версия для Spark 2.x, полученная из ответа

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.types import StringType, StructType, StructField

sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()

rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'),
                      ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'),
                      ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'),
                      ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'),
                      ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

# register as a UDF 
from datetime import datetime
sqlContext.registerFunction("time_delta", lambda y,x:(datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f')-datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f')).total_seconds())

df.createOrReplaceTempView("Test_table")

spark.sql("SELECT ID,EndDateTime,StartDateTime,time_delta(EndDateTime,StartDateTime) as time_delta FROM Test_table").show()

sc.stop()
person Sukhdeep Kharbanda    schedule 30.03.2017