Apache Spark - это среда распределенной обработки данных, которая благодаря своим возможностям подходит для любого контекста больших данных. Несмотря на то, что это относительно недавний продукт (первая лицензия BSD с открытым исходным кодом была выпущена в 2010 году, она была подарена Apache Foundation) 18 июня была выпущена третья основная версия, которая представляет несколько новых функций, включая адаптивное выполнение запросов (AQE). ), о котором мы и поговорим в этой статье.

Немного истории

Spark родился, прежде чем был подарен сообществу, в 2009 году в академическом контексте ampLab (любопытство: AMP - это аббревиатура от Algorithms Machine People) Калифорнийского университета в Беркли. Победившей идеей, лежащей в основе продукта, является концепция RDD, описанная в статье Устойчивые распределенные наборы данных: отказоустойчивая абстракция для кластерных вычислений в памяти », ведущим автором которой является Спарк Матей Захария. отец".

Идея заключается в решении, которое решает основную проблему доступных в то время моделей распределенной обработки (в первую очередь MapReduce): отсутствие уровня абстракции для использования памяти в распределенной системе. Некоторые сложные алгоритмы, которые широко используются в больших данных, например многие для обучения моделей машинного обучения или манипулирования структурами данных графа, многократно повторно используют промежуточные результаты обработки во время вычислений. Одноэтапная архитектура алгоритмов, таких как MapReduce, в таких обстоятельствах сильно страдает, поскольку необходимо записывать (а затем повторно считывать) промежуточные результаты вычислений в постоянном хранилище. Операции ввода-вывода в постоянном хранилище, как известно, обременительны для любого типа системы, даже более того, для развернутой системы из-за дополнительных накладных расходов, вносимых сетевыми коммуникациями. Концепция RDD, реализованная на Spark, блестяще решает эту проблему за счет использования памяти на промежуточных этапах вычислений на многоступенчатом движке DAG.

Другая веха (я перескакиваю, потому что я вникаю в достоинства программирования RDD и подробную историю Spark, хотя и очень интересную, выходящую за рамки целей статьи) - это введение в первую стабильную версию Spark (которая была подарена сообществу Apache. ) модуля Spark SQL.

Одной из причин успеха фреймворка Hadoop до появления Spark было распространение продуктов, которые добавляли функциональность к основным модулям. Среди наиболее часто используемых, безусловно, следует упомянуть Hive, уровень абстракции SQL поверх Hadoop. Несмотря на ограничения MapReduce, из-за которых выполнение более сложных SQL-запросов на этом механизме неэффективно после «перевода» Hive, то же самое до сих пор широко распространено в основном из-за простоты использования.

Лучший способ проследить историю уровня SQL в Spark - снова начать со справочных документов. Shark (предок искрового SQL), восходящий к 2013 году, и проект под названием Spark SQL: реляционная обработка данных в Spark, где представлен Catalyst, оптимизатор, который представляет собой сердце современной архитектуры.

Функции Spark SQL доступны разработчикам через объекты, называемые DataFrame (или Java / Scale Datasets в типобезопасном режиме), которые представляют RDD на более высоком уровне абстракции. Вы можете использовать DataFrame API через определенный DSL или через SQL.

Независимо от того, какой метод вы выберете для использования, операции DataFrame будут обрабатываться, транслироваться и оптимизироваться Catalyst (Spark начиная с версии 2.0) в соответствии со следующим рабочим процессом:

Что нового

Наконец, мы переходим к достоинствам адаптивного выполнения запросов - функции, которая на архитектурном уровне реализована на этом уровне. Точнее, это оптимизация, которая динамически взаимодействует между логическим планом и физическим планом, используя статистику времени выполнения, полученную во время выполнения различных этапов в соответствии с потоком, показанным на следующем изображении:

Затем поток выполнения Spark SQL в версии 3.0 становится следующим:

Подробнее об оптимизации

Поскольку структура AQE основана на расширяемой архитектуре, основанной на наборе правил оптимизации логического и физического плана, можно легко предположить, что разработчики планируют со временем реализовать дополнительные функции. В настоящее время в версии 3.0 реализованы следующие оптимизации:

  • Динамическое объединение перестановочных разделов
  • Динамическое переключение стратегий соединения
  • Динамическая оптимизация перекосов

давайте посмотрим на них один за другим, дотронувшись до них руками на примерах кода.

По поводу создания тестового кластера рекомендуем обратиться к ранее опубликованной статье: Как создать кластер разработки Apache Spark 3.0 на одной машине с помощью Docker.

Динамическое объединение разделов в случайном порядке

Общеизвестно, что операции перемешивания являются самыми дорогостоящими в Spark (как и в любой другой среде распределенной обработки) из-за времени передачи, необходимого для перемещения данных между узлами кластера по сети. К сожалению, в большинстве случаев они неизбежны.

Преобразования в наборе данных, развернутом в Spark, независимо от того, используете ли вы RDD или DataFrame API, могут быть двух типов: узкие и широкие. Для выполнения данных широкого типа необходимо, чтобы данные разделов перераспределялись по-разному между исполнителями. Печально известная операция перемешивания (и создание нового этапа выполнения)

Без AQE определение оптимального количества разделов DataFrame в результате выполнения широкого преобразования (например, объединений или агрегатов) было назначено разработчику путем установки свойства конфигурации spark.sql.shuffle.partitions (значение по умолчанию: 200). Однако, не вдаваясь в достоинства данных, очень сложно установить оптимальное значение, поскольку существует риск создания слишком больших или слишком маленьких разделов, что приведет к проблемам с производительностью.

Допустим, вы хотите выполнить запрос агрегирования для данных, группы которых несбалансированы. Без вмешательства AQE количество разделов будет таким, которое мы выразили (например, 5), а конечный результат может быть примерно таким, как показано на изображении:

Включение AQE вместо этого поместит данные из меньших разделов вместе в больший раздел сопоставимого размера с другими. Результат аналогичен показанному на рисунке.

Эта оптимизация запускается, когда для двух свойств конфигурации spark.sql.adaptive.enabled и spark.sql.adaptive.coalescePartitions.enabled установлено значение true. Поскольку по умолчанию для второго установлено значение true, для практического использования этой функции вам нужно только включить глобальное свойство для активации AQE.

На самом деле, собираясь проанализировать исходный код, вы обнаружите, что AQE фактически включен только в том случае, если запрос требует операций перемешивания или состоит из подзапросов:

и что есть свойство конфигурации, которое можно использовать для принудительного AQE даже при отсутствии одного из двух условий, указанных выше.

Количество разделов после оптимизации будет зависеть от настройки следующих параметров конфигурации:

  • spark.sql.adaptive.coalescePartitions.initialPartitionNum
  • spark.sql.adaptive.coalescePartitions.minPartitionNum
  • spark.sql.adaptive.advisoryPartitionSizeInBytes

где первый представляет начальное количество разделов (по умолчанию: spark.sql.shuffle.partitions), второй представляет минимальное количество разделов после оптимизации (по умолчанию: spark.default.parallelism), а третий представляет «предлагаемый» размер. разделов после оптимизации (по умолчанию: 64 Мб).

Чтобы проверить поведение функции динамической коалиции в произвольных разделах AQE, мы собираемся создать два простых набора данных (один следует понимать как таблицу поиска, в которой нам нужен второй набор данных. присоединиться).

Выборочный набор данных намеренно несбалансирован, транзакции нашей гипотетической «Очень большой компании» составляют около 10% от общего числа. У остальных компаний около 1%:

Давайте сначала проверим, что было бы без AQE.

Получим вывод:

Количество разделов без AQE: 50

Это именно то, что мы сами указали, установив свойство конфигурации spark.sql.shuffle.partitions.

Повторяем эксперимент, включив AQE.

Новый вывод будет:

Количество разделов с AQE: 7

В данном случае значение было определено на основе уровня параллелизма по умолчанию (количества выделенных ядер), то есть значения свойства конфигурации spark.sql.adaptive.coalescePartitions.minPartitionNum.

Теперь давайте попробуем, что получится, «предложив» целевой размер разделов (с точки зрения хранилища). Давайте установим его на 30 КБ, что соответствует нашим выборочным данным.

На этот раз вывод будет:

Количество разделов с AQE (рекомендуемый размер раздела 30 КБ): 15

независимо от количества ядер, выделенных в кластере для нашей работы.

Помимо положительного влияния на производительность, эта функция очень полезна для создания выходных файлов оптимального размера (попробуйте проанализировать содержимое выходных каталогов заданий, которые я создал в формате CSV, хотя и менее эффективен, чтобы вы могли легко проверить файлы).

Во второй и третьей части статьи мы попробуем две другие новые функции:

  • Динамическое переключение стратегий присоединения
  • Динамическая оптимизация перекосов

Будьте на связи!