Как мне начать работу с Airflow, создав группу DAG, которая будет вызывать код Python, который у меня есть в другом репо (по расписанию)?

У меня есть репозиторий git на моем локальном компьютере здесь с некоторым кодом python c: \ repos \ myrepo \ src \ test.py ‹== сценарий python. Я хочу, чтобы Airflow запускался / выполнялся по расписанию

Он размещен на гитхабе.

У меня установлен и запущен воздушный поток (локальная установка) на экземпляре EC2. Я могу получить доступ к веб-странице на моем локальном компьютере разработчика: http: //: и войти в консоль воздушного потока.

Я git клонировал код в экземпляре EC2

Теперь я хочу, чтобы воздушный поток запускал скрипт python (test.py) на повторяющейся основе (например, один раз в день, например, в определенное время).

Как мне это сделать? Меня завели в тупик с текущими инструкциями.


Подробности:

Я зашел на airflow.com и на страницу установки: https://airflow.apache.org/docs/apache-airflow/stable/start/index.html

Есть ссылка: [Быстрый запуск]

Я нажал там:

Я щелкнул: запуск воздушного потока локально (установлен на экземпляре EC2, которого нет в Docker)


Мне удалось перейти на веб-страницу / URL-адрес

Я включил example_bash_operator и example_python_operator и щелкнул внутри, чтобы посмотреть на «‹ ›Код»


=== ›Получите это:

На данный момент я не приблизился к пониманию того, что мне нужно сделать, чтобы Airflow выполнял код в репо, которое у меня есть по расписанию (test.py).

шаг за шагом, что мне нужно сделать, чтобы создать новое задание, которое будет выполнять мой код?

Я не вижу внешнего кода вызова этого образца DAG (код в другом репо). Весь код Python, который должен быть выполнен, содержится в примере.

В инструкциях есть огромные дыры, чтобы помочь кому-то быстро встать и уйти.


На домашней странице Airflow: http: //: / home

Нет [+] Добавить DAG (нет кнопки с плюсом) для добавления DAG. Это идея?

Кроме того, мне нужна помощь в следующем: Это было бы полезно для начала, но в конечном итоге мне нужно программно развертывать задания на сервере.

Любая помощь, которая поможет мне перебраться через этот каньон, определенно поможет. Я не знаю, должен ли я добавить код Airflow DAG к моему существующему репо (обертывание моего кода test.py с примером кода DAG, просто потерянным здесь

или должен ли я создать репо «airflow /», поместить туда код, упаковать мой код как библиотеку, импортировать и т. д. и позвонить оттуда.


person user10664542    schedule 21.05.2021    source источник


Ответы (1)


Шаг 1. Найдите $ AIRFLOW_HOME в файле airflow.cfg, который должен указать путь к папке с вашими dags. Обычно он находится в / airflow / dags /

Шаг 2. В вашем каталоге / dags добавьте файл с именем test_dag.py со следующим кодом

import time
from repos.myrepo.src.test import test_function # this how I would recommend you run your python code in the dag but its upto you if you want to execute a file 
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
}

 dag = DAG('dag_test',
    default_args=default_args,
    description='run_python',
    schedule_interval= None,
)
start= DummyOperator(task_id='start', dag=dag,)

run_this = PythonOperator(
    task_id='run_pythoncode',
    python_callable=test_function,
    dag=dag,
)
end= DummyOperator(task_id='end', dag=dag,)

start >> run_this >> end

Шаг 3. После сохранения тега перейдите на веб-сервер с помощью браузера и перейдите на вкладку DAG, этот вновь созданный DAG должен находиться на вкладке приостановки, слева от тега будет кнопка включения / выключения. Нажмите, чтобы включить даг. Как только он будет включен, нажмите на триггер или кнопку воспроизведения. [Обратите внимание, что любые ошибки, сделанные в файле DAG (test_dag.py), будут отображаться в верхней части вашей страницы DAG в Airflow]

Шаг 4. *** Альтернативный подход *** к вышеупомянутому DAG. Допустим, вы хотите запустить файл python из репозитория git, а не импортировать его в код DAG.

import time
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
}

 dag = DAG('dag_test',
    default_args=default_args,
    description='run_python',
    schedule_interval= None,
)
start= DummyOperator(task_id='start', dag=dag,)

run_this_as_a_file = BashOperator(
    task_id='run_python_code_from_repo',
    bash_command='python c:\\repos\\myrepo\\src\\test.py',
    dag=dag,
)
end= DummyOperator(task_id='end', dag=dag,)

start >> run_this_as_a_file >> end
person Sami Rehman    schedule 21.05.2021