pyspark, сравните две строки в кадре данных

Я пытаюсь сравнить одну строку в кадре данных со следующей, чтобы увидеть разницу в отметке времени. На данный момент данные выглядят так:

 itemid | eventid | timestamp
 ----------------------------
 134    | 30      | 2016-07-02 12:01:40
 134    | 32      | 2016-07-02 12:21:23
 125    | 30      | 2016-07-02 13:22:56
 125    | 32      | 2016-07-02 13:27:07

Я попытался сопоставить функцию с фреймом данных, чтобы можно было сравнивать следующим образом: (примечание: я пытаюсь получить строки с разницей более 4 часов)

items = df.limit(10)\
          .orderBy('itemid', desc('stamp'))\
          .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()

Но я получаю следующую ошибку:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe

Я считаю, что это связано с тем, что я неправильно использую функцию карты. Будем признательны за помощь в использовании карты или другое решение.

ОБНОВЛЕНИЕ: ответ @zero323 был информативным относительно моего неправильного использования сопоставления, однако в системе, которую я использую, используется версия Spark до 2.02, и я работаю с данными в Cassandra.

Мне удалось решить это с помощью mapPartitions. Смотрите мой ответ ниже.

ОБНОВЛЕНИЕ (27.03.2017): С тех пор, как я изначально пометил ответ в этом сообщении, мое понимание Spark значительно улучшилось. Я обновил свой ответ ниже, чтобы показать мое текущее решение.


person ivywit    schedule 06.07.2016    source источник


Ответы (2)


Да, вы неправильно используете функцию map. map одновременно работает с одним элементом. Вы можете попробовать использовать оконные функции следующим образом:

from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

df = (
    sc.parallelize([
        (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
        (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
    ]).toDF(["itemid", "eventid", "timestamp"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
)

w = Window.partitionBy("itemid").orderBy("timestamp")

diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")

df.withColumn("diff", diff)
person zero323    schedule 06.07.2016
comment
Хотя убедитесь, что вы используете HiveContext или Spark 2.02 здесь. - person Jeff; 06.07.2016
comment
Спасибо за понимание карты, но, видимо, для использования window требуется контекст Hive. Система, над которой я работаю, это просто искра с Кассандрой. Я обновлю вопрос, чтобы отметить это. - person ivywit; 06.07.2016
comment
HiveContext не требует Hive per see. Для этого требуется только Spark, созданный с поддержкой Hive (что по умолчанию в случае двоичных файлов предварительной сборки). - person zero323; 06.07.2016
comment
У меня возник вопрос по поводу строки diff = . Разве в самом окне нет функции диапазона, в которой вы можете указать, что вы получаете текущую и следующую строку? Но вы решили это по-другому. - person Matthias; 07.07.2016
comment
@Matthias Как это поможет здесь? - person zero323; 07.07.2016
comment
Я просто пытаюсь понять использование. Я вижу, что есть функция Window.rowsBetween, которую следует использовать для перебора строк. И что есть функция, использующая lag(...., 1), которая указывает на следующую строку. Так мне нужно Window.rowsBetween или нет или когда? После публикации DataBricks и вот этот. - person Matthias; 07.07.2016
comment
@Matthias Короткий ответ, нет. Длинный - SQL:03', если я правильно помню :) - person zero323; 07.07.2016
comment
Работает, но я получаю странные результаты. Для метки времени T1: 2015-04-23 00:00:38.0 и T2: 2015-04-23 00:05:44.0 я получаю дельту 306. Но должно быть 506, верно? Временная метка имеет тип даты и времени, а не строку. - person Matthias; 08.07.2016
comment
@ Матиас Почему? 4 минуты 22 секунды => всего 306 секунд. - person zero323; 08.07.2016
comment
аааа, я думаю в 10 шагах вместо 60 ;) спасибо за подсказку - person Matthias; 08.07.2016

Комментарий @ShuaiYuan к исходному ответу правильный. За последний год я намного лучше понял, как работает Spark, и фактически переписал программу, над которой работал для этого поста.

НОВЫЙ ОТВЕТ (27 марта 2017 г.)
Чтобы выполнить сравнение двух строк фрейма данных, я использовал RDD. Я группирую данные по ключу (в данном случае по идентификатору элемента) и игнорирую идентификатор события, поскольку он не имеет значения в этом уравнении. Затем я сопоставляю лямбда-функцию со строками, возвращая кортеж ключа и список кортежей, содержащих начало и конец промежутков событий, который получен из функции «findGaps», которая выполняет итерацию по списку связанных значений (отсортированных временных меток). к каждому ключу. Как только это будет завершено, я отфильтрую ключи без временных промежутков, а затем flatMapValues, чтобы вернуть данные в формате, более похожем на sql. Это делается с помощью следующего кода:

# Find time gaps in list of datetimes where firings are longer than given duration.  
def findGaps(dates, duration):
    result = []
    length = len(dates)

    # convert to dates for comparison
    first = toDate(dates[0])
    last = toDate(dates[length - 1])
    for index, item in enumerate(dates):
        if index < length -1 and (dates[index + 1] - item).total_seconds() > duration:
            # build outage tuple and append to list
            # format (start, stop, duration)
            result.append(formatResult(item, dates[index + 1], kind))
    return result

outage_list = outage_join_df.rdd\
                            .groupByKey()\
                            .map(lambda row: (
                                     row[0],
                                     findGaps(
                                         sorted(list(row[1])), 
                                         limit
                                     )
                                  )
                            )\
                            .filter(lambda row: len(row[1]) > 0)\
                            .flatMapValues(lambda row: row)\
                            .map(lambda row: (
                                 row[0]['itemid'],     # itemid
                                 row[1][0].date(),     # date
                                 row[1][0],            # start
                                 row[1][1],            # stop
                                 row[1][2]             # duration
                            ))\
                            .collect()

ИСХОДНЫЙ ОТВЕТ (НЕВЕРНЫЙ)
Мне удалось решить эту проблему с помощью mapPartitions:

def findOutage(items):
    outages = []

    lastStamp = None
    for item in items:
        if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
            outages.append({"item": item.itemid, 
                            "start": item.stamp.isoformat(),
                            "stop": lastStamp.isoformat()})
        lastStamp = item.stamp
    return iter(outages)

items = df.limit(10).orderBy('itemid', desc('stamp'))

outages = items.mapPartitions(findOutage).collect()

Спасибо всем за помощь!

person ivywit    schedule 06.07.2016
comment
Необходимо убедиться, что набор данных разделен на timestamp. - person shuaiyuancn; 07.07.2016
comment
@ShuaiYuan, ты прав. Я обновил свой ответ, чтобы показать мое текущее решение проблемы. - person ivywit; 28.03.2017