Введение

За последние десять лет или около того создание и выполнение заданий Spark стало значительно проще, в основном благодаря:

  • API высокого уровня, упрощающие выражение логики.
  • Управляемые облачные платформы —хорошо масштабируемое хранилище объектов и эфемерные кластеры в один клик на основе точечных экземпляров значительно упрощают выполнение заданий (и откладывают необходимость их оптимизации).

В то время как создание логики в Spark и выполнение заданий стало значительно проще, инженерия конвейеров на основе Spark по-прежнему является формой искусства, и очень немногие из них широко распространены. принятые стандарты и лучшие практики.

Многие команды, впервые создающие пайплайны, обнаруживают, что существует крутая кривая обучения и отсутствие задокументированных передовых практик даже для самых основных инженерных проблем, таких как упаковка, развертывание, тестируемость, изоляция среды и наблюдаемость.

По иронии судьбы, это резко контрастирует с конвейерами на основе SQL, которые традиционно представляли собой черную дыру инженерных практик, после того как сообщество приняло DBT в качестве стандарта де-факто для решения многих из этих проблем.

Цель

Цель этого поста — осветить распространенные проблемы разработки программного обеспечения и рецепты создания и запуска пакетных конвейеров на основе Spark в облаке.

Это не полное руководство — идея состоит в том, чтобы охватить некоторые из наиболее распространенных и основных областей, применимых практически к каждому пайплайну.

Излишне говорить, что описанные здесь практики и предложения нельзя считать «стандартами», но, надеюсь, они могут стать разумной отправной точкой.

Примеры основаны на PySpark на Databricks, но принципы должны также применяться к Java или Scala, работающим на других облачных платформах.

Выходит за рамки

  • Крупномасштабные среды / сложные конвейеры
  • Искровой код или оптимизация на уровне кластера
  • Аналитическое тестирование или методология качества данных

Повестка дня

Препродакшн (этот пост)

Пакетные конвейеры данных 101

  • Извлечь, преобразовать, загрузить
  • Работа со временем в инкрементальных запусках
  • Разделение ввода и вывода
  • Объединение результатов

Разработка, тестирование и упаковка конвейеров

  • Базовая структура кода
  • Тесты
  • Структура проекта
  • Упаковка
  • Локальный рабочий процесс
  • CI

Среды и развертывание

  • Окружающая среда
  • Управление конфигурацией
  • Механизм развертывания

Производство и операции (следующий пост)

Выполнение рабочих заданий

  • Наблюдаемость за данными и конвейером
  • Предотвращение отравления данных
  • Параллельное или синее/зеленое развертывание

Распространенные сценарии работы

  • Повторы и повторные запуски
  • Заполнение данных

Пойдем!

Конвейеры пакетных данных 101

Извлечь, преобразовать, загрузить

Конвейер пакетных данных обычно выполняет один или несколько шагов ETL.

Каждый шаг следует схеме:

  • Извлечь — загрузить данные из некоторого места (например, S3)
  • Преобразовать — выполнять агрегацию, фильтровать, применять пользовательские функции и т. д.
  • Загрузить — записать вывод в какое-либо место (например, другой путь на S3)

Иногда конвейер моделируется как DAG таких шагов.

Инкрементальные прогоны и концепция времени

Большинство пакетных конвейеров предназначены для постепенной обработки данных, т.е.:

  • Читать «новые данные»
  • Обработайте это
  • Объедините вывод с результатами предыдущих запусков.

Определение «новых данных»

Большинство (но не все) пакетных заданий выполняются по расписанию, например. ежедневно. Поэтому большинство команд начинают с определения «новых» данных следующим образом:
«каждый день выполнять задание с данными, полученными накануне».

Другими словами, каждый запуск конвейера отвечает за обработку (фиксированного) временного окна данных.

Этот подход известен как «переворачивающиеся окна»: смежные таймфреймы фиксированного размера без перекрытия между ними.

Время стены и время события

Наивное решениекоторое не работает для обработки временных окон с конвейерами:

  • Запускайте конвейер для запуска каждый, например. 24 часа
  • Пусть код конвейера использует «время стены» — например, значение now() минус 24 часа в качестве временного окна, необходимого для обработки

Причина, по которой это не работает, например. случаи, когда пайплайн вышел из строя на выходных и теперь нужно запустить его на данных 72-часовой давности.

Кроме того, большая часть аналитики связана со временем, когда события произошли на практике, а не со временем, когда мы получили известие о них или обсудили их (хотя в некоторых случаях это также важно).

Таким образом, лучшим подходом является определение временного окна каждого запуска на основе «времени события» —т.е. время, когда произошло реальное событие, вызвавшее изменение данных.

Это означает, что каждому запуску задания назначается фиксированное временное окно, которое не зависит напрямую от времени фактического выполнения задания.

(примечание — в некоторых случаях использования, например, в машинном обучении, этот подход может нуждаться в доработке, так как он может привести к перекосу между поездами и тестами).

Использование планировщиков для создания конвейера, запускаемого во временном окне

Распространенным подходом к запуску конвейера во временном окне является использование общего планировщика (например, Airflow, Prefect и т. д.).

  • Планировщик настроен на запуск конвейера, например. каждый день в какое-то время
  • Каждый триггер логически связан с отдельным окном времени события, которое необходимо обработать (например, предыдущая календарная дата).
  • Каждый раз, когда планировщик запускает конвейер, он должен передать ему время запуска.
  • Код использует время триггера для определения временного окна, которое необходимо обработать.
  • Если наш планировщик проснулся позже, чем ожидалось, скажем, в 04:01, он все равно должен передать конвейеру «правильное» время запуска.
  • Если запуск триггера завершился неудачно, конвейер следует запустить повторно с точным исходным временем триггера в качестве параметра.
  • Если сам планировщик был отключен, он должен позаботиться о создании триггеров, которые были пропущены, пока он был отключен.

Так что между кодом планировщика и конвейера довольно много тонкостей и неявных контрактов для корректного управления временными окнами.

Кроме того, каждый планировщик имеет несколько иную интерпретацию этих контрактов.

Вот несколько примеров из Airflow, Prefect и Databricks.

Один из выводов заключается в том, что конвейер должен иметь возможность принимать параметр, представляющий время срабатывания планировщика, и определять временное окно на основе этого значения.

Поздние данные и водяной знак

К сожалению, данные имеют приводящую в бешенство тенденцию поступать с опозданием.

Скажем, мы хотим запустить наш пакет на целых календарных днях «реальных» данных.
Если наш планировщик срабатывает ровно в полночь, возможно, что некоторые данные предыдущего дня все еще записываются и будут пропущенный.

Простой подход состоит в том, чтобы добавить «буфер» к нашему планированию времени, например. установите триггер на срабатывание в 04:00, чтобы обработать данные за предыдущую дату.
Размер буфера можно угадать или даже определить эмпирически.

Несколько вещей, на которые стоит обратить внимание:

  • Фиксированный буфер не является гарантией предотвращения поздних данных, это всего лишь метод управления рисками.
  • Существует явный компромисс между размером буфера, который мы берем (т. е. уверенностью в полноте данных), и свежестью результатов.
  • Если ваш конвейер обрабатывает выходные данные другого пакетного процесса, вы, как правило, больше подвержены задержкам (например, при сбое вышестоящего пакета).
  • Данные, которые поступили позже, чем буфер, трудно заметить, если вы не предпримете какие-либо меры по их устранению.

Оказывается, управление просроченными данными и принятие решения о том, когда безопасно обрабатывать данные с минимальным риском, — довольно сложная проблема, особенно в системах, где результаты должны быть свежими.

Чтобы найти компромисс между свежестью и полнотой, вам нужно подумать о концепции водяных знаков — т. е. приблизительное последнее время, которое мы можем считать достаточно полным для обработки — как а также как обнаруживать и реагировать на случаи, когда более старые данные поступают с большей, чем ожидалось, задержкой.

Подводя итог: для большинства пакетных заданий, которые выполняются нечасто (ежедневно/еженедельно), использование буфера в расписании является простой эвристикой для устранения некоторых задержек в данных.

Что касается других случаев, вам нужно подумать об этих предположениях и, возможно, прочитать дополнительную информацию по теме, например. здесь.

Разделение ввода

Имея дело с большими объемами данных, мы должны организовать их так, чтобы их можно было легко сузить до той части, которую мы хотим обработать.

Spark поддерживает понятие разделения данных, когда мы организуем данные в папках, как структуры в файловой системе, каждая из которых представляет раздел данных некоторым значением столбца.

Если разделы названы в соответствии с определенным соглашением и содержат имя столбца, мы можем добавить условие о значении столбца раздела, которое Spark может протолкнуть и использовать в качестве фильтра для загрузки данных.

Простой пример — разделить данные по дате, а затем добавить условие, которое сообщает искре, что мы хотим извлечь данные только из определенного диапазона дат.

  • Чтобы облегчить правильную обработку, особенно если конвейер выполняет какую-либо агрегацию, лучше всего создавать разделы на основе времени события и времени стены, в котором они были записаны.
  • Рекомендуется применять эту временную фильтрацию на этапе извлечения и оставлять логику преобразования нейтральной.
  • Spark автоматически распознает только фиксированный набор форматов для папок разделов на основе даты.
window_start = calculate_window_start(trigger)
window_end = calculate_window_end(trigger)
df = spark.read.json("s3://my-bucket").filter(f"created_dt >={window_start} and created_dt <= {window_end}")

Разделение вывода

Подобно разделению входных данных, нам обычно также необходимо разделить выходные данные.

Когда обработка представляет собой простое преобразование

В простейшем случае, когда конвейер выполняет преобразование (вместо агрегации), можно разделить выходные данные на основе того же столбца, что и для входных данных.

Это помогает нам понять для каждой строки на входе, где мы ожидаем найти ее на выходе.

Когда обработка представляет собой агрегацию

Скажем, наша работа должна суммировать количество кликов на страницу нашего веб-сайта.

На странице А было три клика, которые произошли в моменты времени t1, t2 и t3 в пределах нашего временного окна.

Результатом этого агрегирования будет строка, указывающая, что у A было 3 клика.
Это можно рассматривать как «агрегированное» или «сложное» событие.
Когда это событие «произошло»?

Один из способов — определить t3 как время «сложного события», поскольку это время, в течение которого идеальная система выдала бы значение «3».

Несмотря на аналитическую точность, генерирование времени события для каждого ключа часто приводит к проблемам с управлением данными во время обратного заполнения и повторного запуска, которые требуют перезаписи результатов (это будет обсуждаться позже).

Альтернативой, которая менее точна с аналитической точки зрения, но решает многие операционные проблемы, является использование единой даты для всего вывода — конец окна.

Хотя это не самое «точное» определение времени, оно не является неверным, поскольку указывает допустимое время, в течение которого система считала событие «истинным».

Схема секционирования выходных данных влияет на то, как вы работаете с обратной засыпкой, а также на то, как вы добавляете новые приращения данных к существующим наборам данных. Подробнее об этом во второй части этого руководства.

Слияние результата инкрементного прогона с большим набором данных

По завершении инкрементного прогона его результаты необходимо объединить со всеми ранее рассчитанными данными.

Важным свойством работы конвейера и, в частности, слияния вывода является то, что мы хотим, чтобы он был идемпотентным, т. е. позволяют нам повторить запуск задания и слияние несколько раз без изменения результата.

При записи результатов в хранилище объектов у нас есть несколько общих стратегий:

  • Добавить — добавить новый вывод к существующему, в том числе добавить новые файлы в существующие разделы. Нечасто, так как повторные показы для одного и того же диапазона дат могут привести к дублированию данных.
  • Перезапись — простая стратегия, которая работает в случае, если все прогоны обрабатывают окна с фиксированным временем и выходные данные соответственно разбиваются на разделы.
    Перезапись работает только для целых разделов, а не для отдельных записей.
    Обычно для этого требуется настроить динамическую перезапись раздела (см. spark.sql.sources.partitionOverwriteMode).
  • слияние — некоторые из современных форматов файлов, такие как delta lake или Iceberg, поддерживают понятие слияния добавочных результатов.
    процесс слияния включает в себя операцию, подобную соединению между новыми данными и старыми. данные по какому-либо условию.
    Несопоставленные записи с любой стороны (новое или старое значение) могут быть вставлены или удалены. Совпадающие записи могут быть объединены по столбцам с использованием бизнес-логики.

Выбор правильной стратегии — дело тонкое.

Объединение дает полную корректность и гибкость, но значительно дороже с точки зрения вычислений. В случае, когда 99% прогонов не обновляют данные, а добавляют или заменяют их, это слишком высокая цена.

Перезапись является дешевой, но более чувствительной к пограничным случаям, чтобы гарантировать, что перезапись не приведет к потере данных.

ACID-свойства чтения/записи секционированных данных

Представьте себе следующий сценарий: производитель занят слиянием своих результатов с набором данных, и в то же самое время потребитель начинает читать весь набор данных.

Наивно читатель может получить противоречивое представление данных (некоторые из них записаны, а некоторые нет).

Это очень важная задача, но до недавнего времени при работе в облаке требовалось достаточно тщательного проектирования, поскольку решения для хранения объектов не предлагают операции атомарного переименования.

Если вам интересна история вопроса, вот некоторая предыстория.

Примерно за последние два годаэта проблема была более или менее решена благодаря использованию формата данных, поддерживающего ACID, например delta-lake или Iceberg.

К сожалению, не все восходящие/нисходящие системы могут читать или записывать эти форматы данных.

Например, если вы загружаете данные на S3 из пожарного шланга Kinesis или читаете выходные данные конвейера из какой-либо старой технологии хранилища данных.

В этих случаях вам все равно придется решать проблемы каким-то другим способом. Сообщение в блоге, упомянутое ранее довольно подробно объясняет альтернативы.

Пример простых инкрементных конвейеров

  • Разделите входные данные по дате (или по часам, если вы планируете работать в окнах, не превышающих один день).
  • Управляйте размером временного окна (например, 24 часа) с помощью конфигурации
  • Попросите планировщик запустить конвейер с некоторым «буфером», чтобы данные могли полностью поступать.
  • Попросите планировщик передать «время срабатывания» в качестве параметра запуска.
  • Считайте данные, относящиеся к временному окну, используя фильтры, которые искра может продавить.
  • Разделение вывода:
    — Если ваш конвейер выполняет только преобразование — сохраните выходной раздел идентичным входному разделу
    — Если ваш конвейер выполняет агрегирование — разделите вывод по дате окончания (или времени) каждого окна бежать обработанный
  • Добавление инкрементных результатов:
    — используйте режим «перезаписи», если ваши данные редко обновляют записи (но в основном добавляют или заменяют их), в идеале с форматом данных, сохраняющим ACID.
    — используйте режим «слияния», если обновления отдельных записей являются общими, и у вас есть прикладная логика для их применения.

Проектирование, тестирование и упаковка трубопроводов

Разработка кода конвейера

Извлечь и загрузить

Входные данные обычно являются частью среды.
То же самое относится и к приемнику данных — месту, где мы сохраняем выходные данные.

В результате рекомендуется:

  • Изолируйте фазы Extract и Load от фазы Transform.
  • Сделайте эти функции легко настраиваемыми, чтобы одна и та же кодовая база могла работать, например, на ваш локальный ноутбук против облака.

Преобразование

Преобразования — это то место, где находится большая часть бизнес-логики.

Чтобы сделать трансформацию модульной и проверяемой, обычно ее разбивают на несколько функций — например, иметь «основную» функцию преобразования, которая вызывает меньшие функции, каждая из которых принимает один или несколько кадров данных Spark и возвращает один или несколько кадров данных Spark.

При написании функций преобразования рекомендуется использовать функциональный стиль.

Конфигурация

Даже самый простой конвейер требует какой-то поддержки для внедрения в него конфигурации.

Как правило, вы хотите иметь возможность настроить поведение извлечения и загрузки, чтобы вы могли запускать код в разных средах, где данные находятся в разных путях и форматах.

Кроме того, часто существуют прикладные конфигурации, связанные с самой логикой, которые могут зависеть или не зависеть от среды.

Подробнее о настройке позже.

Сеанс искры

Для запуска любого конвейера требуется инициализированный сеанс Spark.

Способ инициализации сеанса часто зависит от среды, в которой выполняется конвейер.

Например, некоторые инициализации, которые происходят при работе в отслеживаемом кластере, могут быть неприменимы или неуместны для сценария модульного тестирования.

В результате обычно рекомендуется инициализировать сеанс вне самого конвейера и передать его в конвейер в качестве параметра.

class Task:
    
    def __init__(self, spark: SparkSession, config: JobConfig):
        self.config = config
        self.spark = spark
        
    def main(self):
        extracted = self.extract()
        transformed = self.transform(**extracted)
        return self.load(transformed)

    def extract(self):
        pass
   
    def transform(self, dataframes):
        pass
   
    def load(self, transformed, uri=None):
        pass

Тесты

Если вы разработали код в соответствии с вышеизложенным, его должно быть достаточно легко протестировать.

Большинство тестов охватывают функции преобразования, многие из которых принимают один или несколько фреймов данных и возвращают один или несколько фреймов данных.

Для настройки тестов нам необходимо:

  • Инициализировать сеанс искры для тестов (например, как фикстуру в pytest)
  • Создание кадров данных и ожиданий в памяти
  • Вызвать функции преобразования
  • Утверждение, что выходные данные функций такие же или похожие на ожидаемые результаты

См., например, этот пост.

Модульные тесты

Модульные тесты самого низкого уровня обычно используют очень небольшое количество входных записей для каждого случая, и они часто определяются программно как часть кода теста.

В качестве примера (из сообщения в блоге выше):

@pytest.fixture(scope="session")
def stock_data(spark):
    schema = StructType([
        StructField("stock", StringType()),
        StructField("date", StringType()),
        StructField("price", DoubleType())
    ])
    data = [
        {"stock": "AA", "date": "20180902", "price": 200.0},
        {"stock": "AA", "date": "20181002", "price": 300.50},
        {"stock": "AA", "date": "20181102", "price": 400.50},
        {"stock": "BB", "date": "20180902", "price": None},
        {"stock": "BB", "date": "20181002", "price": 450.50},
        {"stock": "BB", "date": "20181102", "price": 200.50},
    ]
    df = spark.createDataFrame([Row(**x) for x in data], schema)
    df.cache()
    df.count()
    return df

Компоненты или интеграционные тесты

Здесь механизм срабатывания кода идентичен, но тестируемый блок больше.

В результате часто требуются большие входные данные, например. тот, который соответствует «реальной» схеме данных.

Обычно входные данные для более крупных тестов помещаются в файлы, организованные в папку данных вместе с самими тестами.

Например, чтобы запустить весь конвейер на тестовом входе, можно написать тест следующим образом (где параметры обычно являются фикстурами):

def test_end_to_end(session, input_path, tmp_path, expected_path):
    config = {"input_path": input_path, "output_path": tmp_path}
    job = Task(session, config)
    job.main()
    output = load(tmp_path)
    expected = load(expected_path)
    assert_frame_equals(output, expected)

Входная точка

До сих пор мы запускали части нашей логики из модульных тестов.

Чтобы запустить конвейер PySpark как отдельное задание (будь то локально или удаленно), нам обычно нужно предоставить точку входа.

Роль точки входа заключается в следующем:

  • Разобрать параметры командной строки
  • Прочтите любые файлы конфигурации, если это необходимо
  • Создать сеанс
  • При необходимости создайте конвейер
  • Вызов основной функции конвейера
class Task:
    pass
def entrypoint(argv):
    args = parse_arguments(argv)
    config = load_configuration(args)
    session = initialize_session(config)
    task = Task(session=session, config=conf_model)
    task.main()
    print("done")


if __name__ == "__main__":
    entrypoint(sys.argv)

Примечание: точка входа является относительно общей и может использоваться несколькими конвейерами (например, путем передачи имени конвейера в качестве параметра).

Сквозные тесты и локальный запуск пайплайна

Прежде чем запускать конвейер на удаленных кластерах, обычно очень полезно иметь возможность запустить его локально от начала до конца с помощью бегуна в памяти.

Здесь мы используем точку входа для выполнения всей инициализации сеанса и объектов конфигурации, контролируя, например. пути ввода/вывода для задания.

Написание тестов, которые запускают конвейер от начала до конца, возможно, поскольку точка входа позволяет вам вводить аргументы командной строки.

Отладка возможна путем создания средства запуска из среды IDE, которое вызывает модуль точки входа с правильными параметрами.

Наконец, вы можете вызвать конвейер из оболочки до или после его упаковки.

Структура проекта и упаковка

Пока что все, что у нас есть, — это довольно стандартный проект Python.

Нам понадобятся все стандартные леса —

  • Управление требованиями
  • Инструмент упаковки проекта
  • действия на гитхабе
  • хуки перед фиксацией
  • и т. д.

Вы можете использовать свой любимый современный шаблон пакета Python, например этот.

Упаковка

На данный момент у нас есть чистый проект Python, поэтому имеет смысл упаковать его в виде файла .whl, используя стандартный механизм упаковки, основанный на setuptools.

если вы следовали современному шаблону проекта Python, вы выбрали свой механизм упаковки (setup.py, pyproject.toml, поэзия и т. д.)

Часто хорошей идеей является предоставление функции точки входа в качестве API для пакета, например. "вот так".

Это позволяет вам установить упакованный конвейер и запустить его напрямую:

pip install my-pipeline
my_task arg1 ...

В большинстве случаев, по крайней мере, в Python, в упаковке не так много загадок, поэтому вы сможете без особых проблем упаковать свой локальный код на своем ноутбуке, что также полезно для локального тестирования.

Необязательно — запуск локального кода на удаленном кластере

В целях тестирования часто полезно иметь возможность развернуть локальный код (после упаковки) на удаленном кластере в среде разработки/тестирования.

Это может сэкономить много времени при поиске и устранении тривиальных проблем, связанных со средой (по сравнению с ожиданием сбоя какого-либо конвейера CI/CD позже).

Если ваш механизм развертывания хорошо разработан, это обычно вполне осуществимо.

Мы обсудим развертывание позже в этом посте.

Сводка по локальному рабочему процессу

CI

Здесь вы можете следовать классическому рабочему процессу Python CI, используя, например. действия github, в том числе:

  • Проверить код
  • Запустите хук pre-commit для линтинга и других принудительных мер
  • Запуск модульных тестов
  • Запустите более крупные тесты (например, сквозной тест)
  • Упаковать конвейер
  • Отправьте его в какой-нибудь (частный) реестр артефактов

Мы затронули тему развертывания на удаленном кластере, теперь пришло время заняться этой темой более подробно.

Развертывание и управление средой

Вот тут-то и начинаются сложности.

  • Что означает «среда» в контексте конвейера данных?
  • Что включает в себя среда?
  • Как среды изолированы друг от друга?
  • Как разные среды влияют на мою кодовую базу и dev. процесс?
  • Какие артефакты мне нужно развертывать каждый раз?

Давайте начнем распаковывать эти вопросы.

Что такое среды и какие из них мне нужны?

Как правило, «среда» — это набор ресурсов, в которых создаваемое вами программное обеспечение может быть развернуто и запущено в различных сценариях.

Обычно процессы разработки программного обеспечения включают некоторые или все следующие среды:

  • Локальный – запуск системы на локальном компьютере или рабочем CI (см. локальный рабочий процесс выше).
  • Разработка/тестирование — эта среда может быть личной, выделенной для каждой команды или общей для нескольких команд. Обычно используется для раннего тестирования функций и, при необходимости, CI.
  • QA — после того, как официальная версия или релиз-кандидат были отправлены, некоторые команды развертывают их в отдельной чистой среде для QA (в основном, когда QA частично выполняется вручную).
  • Постановка — перед выпуском программного обеспечения в производство мы можем «подготовить» его в среде, близкой к производственной, и наблюдать за его поведением в условиях, близких к производственной.
  • Нагрузочное тестирование — когда вам нужен выделенный набор ресурсов для создания и тестирования системы под нагрузкой.
  • Производство — где система может работать и влиять на конечного пользователя.

Что включает в себя «среда» для конвейеров Spark?

Как минимум, среда включает в себя:

  • Искровые кластеры (или возможность их раскручивать)
  • Входные данные (или инструменты, которые могут генерировать данные)
  • Приемники для записи выходных данных конвейера (например, корзина S3 или базы данных)

Кроме того, среда может включать:

  • Другие трубопроводы (например, восходящие и нисходящие)
  • Каталоги данных (например, хранилище метаданных улья)
  • Инструменты планировщика/оркестратора
  • Инструменты мониторинга
  • Более

Разумная отправная точка (пример для Databricks на AWS)

  • Локальный — Spark в памяти (чтение из локальной файловой системы или S3).
  • Разработка/тестирование, стадия, рабочая версия — отдельные блоки данных рабочие области, использующие один и тот же блок данных и аккаунт AWS.
  • Хранилище — отдельные сегменты для каждой рабочей области.
    Все среды могут иметь права только на чтение для корзин друг друга.
  • Мониторинг — одна и та же учетная запись для подготовки и производства с разными метками метрик и возможностью фильтрации по среде на информационных панелях.

Чтобы увеличить количество команд, вы можете открыть отдельные рабочие пространства для разработки и тестирования для каждой команды.

Изоляции на уровне среды недостаточно

Часто сред гораздо меньше, чем конвейеров, использующих их.

Например, мы можем работать над двумя разными конвейерами в разработке и тестировании или одновременно запускать две версии одного и того же конвейера в рабочей среде.

Из этого следует, что большая часть изоляции между запусками конвейера должна исходить из того, как мы развертываем конвейеры в среде.

Развертывание

Что включает в себя развертывание

Развертывание конвейера в среде обычно включает:

  • Имя развернутого конвейера
  • Указатель на двоичный файл, который нам нужно запустить (например, пакеты Python в частном репозитории PyPi)
  • Команда для запуска двоичного файла
  • Аппликативная конфигурация (например, аргументы командной строки)
  • Конфигурация кластера — включая компьютеры, роли, конфигурации искр, сценарии инициализации, env. переменные и т.д.
  • Конфигурация планировщика
  • Конфигурация мониторинга и оповещения, когда это уместно

Развертывание как конфигурация

Большинство этих вариантов развертывания строго структурированы.

Большинство облачных платформ Spark имеют структурированные API для развертывания, которые принимают некоторые вариации вышеуказанной информации в качестве полезной нагрузки JSON.

Иерархическое управление конфигурацией

Чтобы избежать повторения конфигурации каждый раз, когда мы развертываем конвейер, мы можем захотеть управлять конфигурацией иерархически.

Конфигурации нижнего уровня являются частью учетной записи или среды и размещаются там при первой настройке (и редко изменяются).

Остальными параметрами конфигурации, специфичными для конвейера, необходимо управлять вместе с кодовой базой конвейера, поскольку они могут меняться в процессе разработки логики.

Прикладная конфигурация

Практически каждый конвейер должен предоставлять какой-либо способ внешней настройки своего поведения.

Например:

  • Управление входными и выходными путями на S3
  • Установка ограничения на запрос в тестовой среде
  • Установка размера временного окна для выполнения агрегирования
  • и т. д.

Если количество параметров, которыми вы хотите управлять, велико, имеет смысл организовать их в файлы конфигурации.

Следуя концепции иерархической конфигурации, мы можем захотеть иметь механизм конфигурации, который позволяет:

  • Специализация конфигураций, управляемых в файлах (например, для каждой среды)
  • Переопределение значений через аргументы командной строки

Отличным инструментом, который позволяет все это и многое другое, является hydra.cc.

my_pipeline -c /path/to/hydra/configuration/root +env=test ++job_config.some_key="x"

Руководство по гидре выходит за рамки этого сообщения в блоге.

Механика развертывания

Процедура развертывания должна выполнить следующие действия:

  • Создайте пакет (необязательно)
  • Разверните пакет в доступном месте для кластера, например. Репозиторий PyPi (необязательно)
  • Развертывание аппликативных файлов конфигурации в доступном месте для кластера (например, в общей файловой системе, доступной для драйвера Spark).
  • «Скомпилируйте» конфигурацию времени выполнения для конкретного развертывания этого конвейера — определите кластер, имя конвейера, правильную команду для вызова конвейера, конфигурацию планировщика, конфигурацию мониторинга и т. д.
  • Вызов API для создания кластера и правильной регистрации конвейера в среде со всей вышеуказанной информацией.

Отличным инструментом для выполнения всего вышеперечисленного на Databricks является dbx.

  • Развертывание вызывается из командной строки.
dbx deploy --environment test --workflow my_pipeline --config ....
  • Он поставляется с файлом .yaml, который управляет командой сборки, а также большинством других аспектов конфигурации среды выполнения.
  • Как правило, он организует развертывание по среде и конвейеру.
  • Большая часть аппликативной конфигурации для каждого развертывания может быть переопределена командой развертывания.
  • Вы можете добавить более динамичное поведение, например. переопределить имя созданного задания Databricks с помощью шаблонов Jinja и переменных среды.

Структура проекта с конфигурацией на основе dbx и Hydra

.
├── .github
│   └── workflows
│       ├──ondeploy.yaml
|       └──onpush.yaml
├── .dbx
│   └──project.json
├── conf
│   ├── deployment.yaml      #dbx
│   └── tasks
│       └── my_task          #hydra
│           ├── my_task.yaml  
|           ├── test
│           ├── stage
|           └── prod
├── my_pipeline
│   ├── entrypoint.py
│   └── tasks
│       └── my_task.py
├── tests
│   ├── data
│   ├── unit
│   │   └── tasks
│   │       └── my_task_test.py
│   └── system
│       └── system_test.py
├── .pre-commit-config.yaml
├── pyproject.toml
├── setup.cfg
├── setup.py
└── README.md

Краткое содержание

В этом посте мы рассмотрели основные проблемы подготовки к работе с конвейерами данных на основе Spark в облаке.

Ни одна из приведенных выше сведений не может считаться новой сама по себе, но я надеюсь, что она может сэкономить читателю некоторые усилия как при выявлении, так и при разработке решений этих общих проблем.

Следующая часть будет посвящена запуску и эксплуатации конвейеров в производственной среде.