Обзор

Значимой тенденцией развития ИТ-бизнеса в настоящее время является готовность к работе с горячими данными, время жизни которых с момента их появления может составлять менее секунды.

Допустим, вы приходите в магазин и берете кредит на покупку телефона. Вы хотите получить кредит на выгодных условиях. И банк хочет дать кредит проверенному клиенту. Временное окно, в котором вам нужны кредитные деньги, относительно короткое. Пример из домена Telecom. У вас закончились деньги, а в это время ваша жена собирается рожать ребенка. Вам нужно вызвать скорую помощь. Доверительный платеж от оператора был бы очень кстати.

Стоимость данных для бизнеса в таких ситуациях крайне важна. Такое положение дел на рынке привело к появлению каппа-архитектуры и потоковой обработки данных, которая должна гарантировать минимальную задержку.

Недавно я писал о нашей системе доставки кода на базе Kubernetes, Airflow и Jenkins.

Среди преимуществ нашей системы:

  1. Чрезвычайно низкие затраты на разработку
  2. Решение всех необходимых задач по доставке кода
  3. Единая точка управления продуктом
  4. Отказоустойчивость и контрольная точка

Для поддержки потоковой передачи мы хотим сохранить тот же способ автоматизации, ту же точку контроля, что и пакетные задания, и свести к минимуму количество используемых ресурсов.

Концепции

Мы решили, что автоматизация потоковых приложений не должна отличаться от автоматизации пакетных приложений. Кроме того, мы хотим собирать Scala-приложение «на лету» из репозитория.

Дело в том, что приложения Spark Streaming блокируют консоль после выполнения spark-submit. опции

spark.yarn.submit.waitAppCompletion = false

позволяет нам добиться spark-submit поведения, идентичного отправке пакетного задания. Соответственно, после отправки такого приложения наш Kubernetes Pod, под которым происходит отправка, схлопывается и освобождает ресурсы.

Далее нам нужно решить, как понять, что задание запущено. Это довольно просто сделать через YARN CLI. Эта команда bash делает это:

while yarn application -list | grep InternationalRoamingApp | grep -e RUNNING -e ACCEPTED; do sleep 10; done && exit 1 > /dev/null 2>&1

Если приложение дает сбой, мы также должны повторно выполнить spark-submit шагов, чтобы перезапустить приложение.

Ремастеринг поколения DAG

Теперь, когда мы узнали, как должны обрабатываться задания Spark. Нам просто нужно добавить поддержку генерации оператора SSH для нашей системы автоматизации, которая позволит проверять состояние задания.

Файлы конфигурации YAML

Новые структуры одношаговых и многошаговых файлов. Реворк позволяет объявить команду для удаленного выполнения (в нашем случае это кластер Hadoop) и дополнительные контейнеры для пода, которые могут подмешать некоторую дополнительную логику (в нашем случае сборка приложения).

Файл одношаговой конфигурации

Многоэтапная настройка

Новое в шаблоне j2 DAG

Метод, который позволяет перезапустить все шаги до неудачного шага:

Преобразование команд для внутреннего использования в Python:

Петли для боковых контейнеров:

Макросы для SSH-оператора, выполняющего удаленную команду:

Результирующий Python-скрипт от рендеринга этого шаблона вы могли видеть в Приложении 1.

Ремастеринг выполнения DAG

На этапе выполнения DAG первым шагом является spark-submit, который запускает модуль, который завершает сборку и отправку. После этого запускается обычный SSH-оператор, который управляет запущенным приложением. В случае сбоя приложения происходит сбой команды оператора SSH и перезапуск потока. Мы могли бы контролировать количество перезапусков и интервал перезапуска нашего потокового приложения.

Новое в шаблоне команды генерации пода j2

Команда позволяет добавлять sidecar-контейнеры. В нашем случае мы используем один для создания проектов Scala.

Результат рендеринга этой команды находится в Приложении 2.

Состав модели ML в приложениях Spark Streaming

Spark позволяет использовать модели как:

  1. UDF-функция
  2. Преобразователь или оценщик внутри конвейера Spark ML.

Наша система позволяет разделить модель и логику подготовки данных, использовать модели как отдельные микросервисы и создавать композиции разными способами.

Приложение 1. Пример генерации DAG

Приложение 2. Пример отображаемой команды