ТЕХНОЛОГИЯ ЭКСПЕДИА ГРУПП - ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ

Подробное описание оконных функций Apache Spark

Оконные функции работают с группами данных и возвращают значения для каждой записи или группы.

В этом сообщении блога мы подробно рассмотрим оконные функции Apache Spark. Вам также могут быть интересны мои предыдущие сообщения об Apache Spark.

Во-первых, давайте посмотрим, что такое оконные функции и когда их следует использовать. Мы используем различные функции в Apache Spark, такие как month (вернуть месяц с даты), round (округлить значение) иfloor (дать минимальное значение для данного ввода) и т. Д., Которые будут выполняться для каждой записи и возвращать значение. для каждой записи. Затем у нас есть различные агрегированные функции, которые будут выполняться для группы данных и возвращать одно значение для каждой группы, например sum, avg, min, max и count. Но что, если мы хотим выполнить операцию с группой данных и иметь одно значение / результат для каждой записи? В таких случаях мы можем использовать оконные функции. Они могут определять ранжирование для записей, кумулятивное распределение, скользящее среднее или идентифицировать записи до или после текущей записи.

Давайте воспользуемся некоторыми примерами Scala API, чтобы узнать о следующих оконных функциях:

  • Совокупные: min, max, avg, count и sum.
  • Рейтинг: rank, dense_rank, percent_rank, row_num и ntile
  • Аналитический: cume_dist, lag и lead
  • Пользовательская граница: rangeBetween и rowsBetween

Для вашего удобства записная книжка Zeppelin, экспортированная как файл JSON, а также файл Scala доступны на GitHub.

Создать фрейм данных Spark

Теперь давайте создадим образец Spark DataFrame, который мы будем использовать в этом блоге. Сначала загрузим необходимые библиотеки.

Теперь мы создадим DataFrame с некоторыми фиктивными данными, которые мы будем использовать для обсуждения различных оконных функций.

Вот как выглядит наш DataFrame:

Оконные агрегатные функции

Давайте посмотрим на некоторые агрегированные оконные функции, чтобы увидеть, как они работают.

Во-первых, нам нужно определить спецификацию окна. Допустим, мы хотели бы получить агрегированные данные по отделу. Итак, мы определим наше окно на основе названия отдела (столбец: depname) в этом примере.

Создайте спецификацию окна для агрегатной функции

Применить агрегатную функцию к окну

Теперь внутри отдела (столбец: depname) мы можем применять различные агрегированные функции. Итак, давайте попробуем найти максимальную и минимальную зарплату в каждом отделе. Здесь мы выбрали только нужные столбцы (depName, max_salary и min_salary) и удалили повторяющиеся записи.

Выход:

+---------+----------+----------+
|  depname|max_salary|min_salary|
+---------+----------+----------+
|  develop|      6000|      4200|
|    sales|      5000|      4800|
|personnel|      3900|      3500|
+---------+----------+----------+

Теперь посмотрим, как это работает. Мы разделили данные по названию отдела:

Теперь, когда мы выполняем агрегатную функцию, она будет применена к каждому разделу и вернет агрегированное значение (в нашем случае min и max).

Примечание. Доступные агрегатные функции: max, min, sum, avg и count.

Функции ранжирования окон

В этом разделе мы обсудим несколько типов функций ранжирования.

Создать спецификацию окна для функции ранжирования

Теперь предположим, что мы хотели бы ранжировать сотрудников на основе их зарплаты в отделе. Сотрудник с самой высокой зарплатой будет иметь ранг 1, а тот, у кого меньше всего заработная плата, будет занимать последнее место. Здесь мы разделим данные по отделам (столбец: depname), а внутри отдела мы отсортируем данные по зарплате в порядке убывания.

Для каждого отдела записи сортируются по заработной плате в порядке убывания.

1. Ранговая функция: rank

Эта функция вернет ранг каждой записи в разделе и пропустит последующий ранг после любого повторяющегося ранга:

Выход:

+---------+-----+------+----+
|  depName|empNo|salary|rank|
+---------+-----+------+----+
|  develop|    8|  6000|   1|
|  develop|   11|  5200|   2|
|  develop|   10|  5200|   2|
|  develop|    9|  4500|   4|
|  develop|    7|  4200|   5|
|    sales|    1|  5000|   1|
|    sales|    4|  4800|   2|
|    sales|    3|  4800|   2|
|personnel|    2|  3900|   1|
|personnel|    5|  3500|   2|
+---------+-----+------+----+

Здесь мы видим, что некоторые ранги дублируются, а некоторые отсутствуют. Например, в отделе develop у нас есть 2 сотрудника с rank = 2 и ни один сотрудник с rank = 3 , потому что функция ранжирования будет сохранять тот же ранг для того же значения и соответственно пропускать следующие ранги.

2. Плотный ранг: density_rank

Эта функция вернет ранг каждой записи в разделе, но не пропустит ни одного ранга.

Выход:

+---------+-----+------+-----------+
|  depName|empNo|salary|desnse_rank|
+---------+-----+------+-----------+
|  develop|    8|  6000|          1|
|  develop|   10|  5200|          2|
|  develop|   11|  5200|          2|
|  develop|    9|  4500|          3|
|  develop|    7|  4200|          4|
|    sales|    1|  5000|          1|
|    sales|    3|  4800|          2|
|    sales|    4|  4800|          2|
|personnel|    2|  3900|          1|
|personnel|    5|  3500|          2|
+---------+-----+------+-----------+

Здесь мы видим, что некоторые ранги дублируются, но ранги не пропадают, как при использовании функции rank. Например, в develop отделе у нас есть 2 сотрудника с рангом = 2. Функция dense_rank сохранит тот же ранг для того же значения, но не пропустит следующие ранги.

3. Функция Row num: row_number

Эта функция назначит номер строки в окне. Если 2 строки будут иметь одинаковое значение для столбца упорядочивания, не определено, какой номер строки будет назначен каждой строке с одинаковым значением.

Выход:

+---------+-----+------+----------+
|  depName|empNo|salary|row_number|
+---------+-----+------+----------+
|  develop|    8|  6000|         1|
|  develop|   10|  5200|         2|
|  develop|   11|  5200|         3|
|  develop|    9|  4500|         4|
|  develop|    7|  4200|         5|
|    sales|    1|  5000|         1|
|    sales|    3|  4800|         2|
|    sales|    4|  4800|         3|
|personnel|    2|  3900|         1|
|personnel|    5|  3500|         2|
+---------+-----+------+----------+

4. Функция процентного ранга: percent_rank

Эта функция вернет относительный (процентильный) ранг внутри раздела.

Выход:

+---------+-----+------+------------+
|  depName|empNo|salary|percent_rank|
+---------+-----+------+------------+
|  develop|    8|  6000|         0.0|
|  develop|   10|  5200|        0.25|
|  develop|   11|  5200|        0.25|
|  develop|    9|  4500|        0.75|
|  develop|    7|  4200|         1.0|
|    sales|    1|  5000|         0.0|
|    sales|    3|  4800|         0.5|
|    sales|    4|  4800|         0.5|
|personnel|    2|  3900|         0.0|
|personnel|    5|  3500|         1.0|
+---------+-----+------+------------+

5. Функция N-плитки: ntile

Эта функция может дополнительно разделить окно на n групп на основе спецификации окна или раздела. Например, если нам нужно разделить отделы на три группы, мы можем указать ntile как 3.

Выход:

+---------+-----+------+-----+
|  depName|empNo|salary|ntile|
+---------+-----+------+-----+
|  develop|    8|  6000|    1|
|  develop|   10|  5200|    1|
|  develop|   11|  5200|    2|
|  develop|    9|  4500|    2|
|  develop|    7|  4200|    3|
|    sales|    1|  5000|    1|
|    sales|    3|  4800|    2|
|    sales|    4|  4800|    3|
|personnel|    2|  3900|    1|
|personnel|    5|  3500|    2|
+---------+-----+------+-----+

Оконные аналитические функции

Далее мы обсудим аналитические функции, такие как совокупное распределение, отставание и опережение.

1. Кумулятивная функция распределения: cume_dist

Эта функция дает кумулятивное распределение значений для окна / раздела.

Определите спецификацию окна и примените функцию cume_dist, чтобы получить кумулятивное распределение.

Выход:

+---------+-----+------+------------------+
|  depName|empNo|salary|         cume_dist|
+---------+-----+------+------------------+
|  develop|    7|  4200|               0.2|
|  develop|    9|  4500|               0.4|
|  develop|   10|  5200|               0.8|
|  develop|   11|  5200|               0.8|
|  develop|    8|  6000|               1.0|
|    sales|    4|  4800|0.6666666666666666|
|    sales|    3|  4800|0.6666666666666666|
|    sales|    1|  5000|               1.0|
|personnel|    5|  3500|               0.5
|personnel|    2|  3900|               1.0|
+---------+-----+------+------------------+

2. Функция запаздывания: запаздывание

Эта функция вернет значение, предшествующее смещению строк из DataFrame.

Функция lag принимает 3 аргумента (lag(col, count = 1, default = None)),
col: определяет столбцы, к которым необходимо применить функцию.
count: сколько строк нам нужно просмотреть назад.
default: определяет значение по умолчанию.

Выход:

+---------+-----+------+----+
|  depName|empNo|salary| lag|
+---------+-----+------+----+
|  develop|    7|  4200|null|
|  develop|    9|  4500|null|
|  develop|   10|  5200|4200|
|  develop|   11|  5200|4500|
|  develop|    8|  6000|5200|
|    sales|    4|  4800|null|
|    sales|    3|  4800|null|
|    sales|    1|  5000|4800|
|personnel|    5|  3500|null|
|personnel|    2|  3900|null|
+---------+-----+------+----+

Например, давайте найдем зарплату за 2 строки до текущей строки.

  • Для депо = develop, зарплата = 4500. Нет такой строки, которая была бы на 2 строки до этой строки. Таким образом, он будет равен нулю.

  • Для deptname = develop, salary = 6000 (выделено синим). Если пройти 2 строки раньше, мы получим 5200 в качестве зарплаты (выделено зеленым).

3. Ведущая функция: ведущий

Эта функция вернет значение после строк смещения из DataFrame.

Функция lead принимает 3 аргумента (lead(col, count = 1, default = None))
col: определяет столбцы, к которым должна применяться функция.
count: сколько строк нам нужно искать вперед / после текущей строки.
default: определяет значение по умолчанию.

Выход:

+---------+-----+------+----+
|  depName|empNo|salary| lag|
+---------+-----+------+----+
|  develop|    7|  4200|5200|
|  develop|    9|  4500|5200|
|  develop|   10|  5200|6000|
|  develop|   11|  5200|null|
|  develop|    8|  6000|null|
|    sales|    3|  4800|5000|
|    sales|    4|  4800|null|
|    sales|    1|  5000|null|
|personnel|    5|  3500|null|
|personnel|    2|  3900|null|
+---------+-----+------+----+

Попробуем поискать зарплату на 2 строки вперед / после текущей строки.

  • для depname = develop, salary = 4500 (выделено синим). Если мы перейдем на 2 строки вперед / назад, мы получим 5200 в качестве зарплаты (выделено зеленым).

  • для деп. = personnel, зарплата = 3500. В этом разделе нет такой строки, которая была бы на 2 строки вперед / после этой строки. так что мы получим null.

Пользовательское определение окна

По умолчанию границы окна определяются столбцом раздела, и мы можем указать порядок с помощью спецификации окна. Например, для отдела develop начало окна - минимальное значение зарплаты, а конец окна - максимальное значение зарплаты.

Но что, если мы захотим изменить границы окна? Следующие функции можно использовать для определения окна в каждом разделе.

1. rangeBetween

Используя функцию therangeBetween, мы можем определить границы явно.
Например, давайте определим начало как 100 и конец как 300 единиц от текущей зарплаты и посмотрим, что это означает. Начать как 100 означает, что окно будет начинаться с 100 единиц и заканчиваться значением 300 от текущего значения (начальное и конечное значения включены).

Определить спецификацию окна

L после начального и конечного значений означает, что значение относится к типу Scala Long.

Применить спецификацию пользовательского окна

Выход:

+---------+-----+------+----------+
|  depName|empNo|salary|max_salary|
+---------+-----+------+----------+
|  develop|    7|  4200|      4500|
|  develop|    9|  4500|      null|
|  develop|   10|  5200|      null|
|  develop|   11|  5200|      null|
|  develop|    8|  6000|      null|
|    sales|    3|  4800|      5000|
|    sales|    4|  4800|      5000|
|    sales|    1|  5000|      null|
|personnel|    5|  3500|      null|
|personnel|    2|  3900|      null|
+---------+-----+------+----------+

Давайте теперь попробуем разобраться в выводе.

  • Для depname = develop, salary = 4200 начало окна будет (текущее значение + начало), что составляет 4200 + 100 = 4300. Конец окна будет ( текущее значение + конец), что составляет 4200 + 300 = 4500.

Поскольку существует только одно значение зарплаты в диапазоне от 4300 до 4500 включительно, то есть 4500 для develop отдела, мы получили 4500 как max_salary для 4200 (проверьте результат выше).

  • Аналогичным образом для имя_отдела = develop, зарплата = 4500 окно будет (start : 4500 + 100 = 4600, end : 4500 + 300 = 4800). Но нет значений заработной платы в диапазоне от 4600 до 4800 включительно для develop отдела, поэтому максимальное значение будет нулевым (см. Вывод выше).

Здесь можно использовать несколько специальных граничных значений.

  • Window.currentRow: для указания текущего значения в строке.
  • Window.unboundedPreceding: Это можно использовать для неограниченного начала окна.
  • Window.unboundedFollowing: Это можно использовать для неограниченного конца окна.

Например, нам нужно найти максимальную зарплату, которая больше 300 от зарплаты сотрудника. Итак, мы определим начальное значение как 300L и определим конечное значение как Window.unboundedFollowing:.

Выход:

+---------+-----+------+----------+
|  depName|empNo|salary|max_salary|
+---------+-----+------+----------+
|  develop|    7|  4200|      6000|
|  develop|    9|  4500|      6000|
|  develop|   10|  5200|      6000|
|  develop|   11|  5200|      6000|
|  develop|    8|  6000|      null|
|    sales|    3|  4800|      null|
|    sales|    4|  4800|      null|
|    sales|    1|  5000|      null|
|personnel|    5|  3500|      3900|
|personnel|    2|  3900|      null|
+---------+-----+------+----------+

Итак, для имя депо = personnel, зарплата = 3500. окно будет (start : 3500 + 300 = 3800, end : unbounded). Таким образом, максимальное значение в этом диапазоне составляет 3900 (проверьте результат выше).

Точно так же для имя_разработчика = sales, зарплата = 4800 окно будет (start : 4800 + 300, 5100, end : unbounded). Поскольку для sales department нет значений больше 5100, null результатов.

2. ряды между

С помощью rangeBetween мы определили начало и конец окна, используя значение столбца упорядочения. Однако мы также можем определить начало и конец окна с относительной позицией строки.

Например, мы хотели бы создать окно, в котором начало окна находится на одну строку раньше текущей, а конец - на одну строку после текущей.

Определить спецификацию пользовательского окна

Применить спецификацию пользовательского окна

Выход:

+---------+-----+------+----------+
|  depName|empNo|salary|max_salary|
+---------+-----+------+----------+
|  develop|    7|  4200|      4500|
|  develop|    9|  4500|      5200|
|  develop|   10|  5200|      5200|
|  develop|   11|  5200|      6000|
|  develop|    8|  6000|      6000|
|    sales|    3|  4800|      4800|
|    sales|    4|  4800|      5000|
|    sales|    1|  5000|      5000|
|personnel|    5|  3500|      3900|
|personnel|    2|  3900|      3900|
+---------+-----+------+----------+

Давайте теперь попробуем разобраться в выводе.

  • Для depname = develop, salary = 4500 будет определено окно с одной строкой до и после текущей строки (выделено зеленым). Таким образом, заработная плата в окне составляет (4200, 4500, 5200), а максимальная - 5200 (см. Вывод выше).

  • Точно так же для имя_отдела = sales, зарплата = 5000 будет определено окно с единицей до и после текущей строки. Поскольку после этой строки нет строк, в окне будут только 2 строки (выделены зеленым) с заработной платой (4800, 5000) и максимальным значением 5000 (см. Вывод выше).

Мы также можем использовать специальные границы Window.unboundedPreceding, Window.unboundedFollowing и Window.currentRow, как мы это делали ранее с rangeBetween.

Примечание. В rowsBetween порядок необязателен, но я использовал его для обеспечения единообразия результатов при каждом запуске.

Резюме

Надеюсь, вам понравилось узнавать об оконных функциях в Apache Spark. В этом блоге мы обсуждали использование оконных функций для выполнения операций с группой данных и получения одного значения / результата для каждой записи. Мы также обсудили различные типы оконных функций, такие как агрегатные, ранжирующие и аналитические функции, включая то, как определять пользовательские границы окна. Вы можете найти записную книжку Zeppelin, экспортированную как файл JSON, а также файл Scala на GitHub. В моем следующем блоге я расскажу о различных функциях работы с массивами, доступных в Apache Spark.