Как вручную запустить Airflow DAG в определенном каталоге

Я оцениваю, подходит ли Airflow для моих нужд (в биоинформатике). У меня проблемы с моделью Airflow. Конкретно:

  • Где на самом деле выполняется файл DAG? Каков его контекст? Как передать входные данные в файл определения DAG? (Например, я не хочу создавать задачу для каждого файла в каталоге.)
  • Как я могу выполнять группу доступности базы данных на разовой основе? Как передать параметры для построения DAG?

Вот пример того, что я хотел бы выполнить. Скажем, я только что получил некоторые данные в виде каталога, содержащего 20 файлов, доступных в некоторой общей файловой системе. Я хочу выполнить конвейер DAG, который запускает определенную команду bash для каждого из 20 файлов, затем объединяет некоторые результаты и выполняет дальнейшую обработку. Группе DAG нужен путь в файловой системе, а также список файлов в каталоге, чтобы создать задачу для каждого из них.

Мне, вероятно, не нужно передавать метаданные от одной задачи к другой (что, как я понимаю, возможно через XCom), если я могу динамически создать всю группу DAG заранее. Но мне непонятно, как пройти путь к построению DAG.

Другими словами, я бы хотел, чтобы мое определение DAG включало что-то вроде

dag = DAG(...)
for file in glob(input_path):
    t = BashOperator(..., dag=dag)

Как мне получить input_path, если я хочу вручную запустить DAG?

Мне также действительно не нужно планирование в стиле cron.


person Uri Laserson    schedule 26.05.2017    source источник
comment
Аналогичной функциональностью в Luigi было бы указание параметров с помощью интерфейса командной строки.   -  person Uri Laserson    schedule 26.05.2017


Ответы (1)


Что касается input_path, вы можете передать его в DAG, используя переменные Airflow. Пример кода, используемого в файле DAG:

input_path = Variable.get("INPUT_PATH")

Переменные можно импортировать с помощью Airflow cli или вручную через пользовательский интерфейс.

Для этого типа логики следует использовать подтег:

dag = DAG(...) for file in glob(input_path): t = BashOperator(..., dag=dag)

SubDAG идеально подходят для повторяющихся шаблонов. Определение функции, возвращающей объект DAG, является хорошим шаблоном проектирования при использовании Airflow.

person Jorge Almeida    schedule 04.08.2017