Остановить выполнение оставшихся задач в воздушном потоке

У меня есть три задачи t1,t2,t3. каждый вывод задачи является вводом следующей задачи, например, t1 вывод является t2 вводом. После завершения t1 я получаю пустую выходную папку (что может случиться в моем случае, и это приемлемо и помечено t1 как успех), но t2 не удалось получить выходные данные t1, так как нет файлов. Я хочу пометить t2 и t3 как успех, если файлов нет. Как я могу пропустить следующие две задачи.


Я просмотрел документы по воздушному потоку и другие статьи, наткнулся на датчики и метод тыка. Но, не уверен, как поступить с этим.


person Venkata Gogu    schedule 26.06.2018    source источник


Ответы (2)


Вы можете использовать SensorOperator, точнее FileSensorOperator, чтобы проверить, существует ли файл. Затем вы можете использовать soft_fail. arg, чтобы пометить задачи как «пропущенные», если файл не существует. Это позволит группе обеспечения доступности баз данных работать успешно, сохраняя при этом правильную историю того, что произошло при проверке файла.

person andscoop    schedule 26.06.2018
comment
Спасибо, я ценю вашу помощь! Я буду смотреть в него. - person Venkata Gogu; 26.06.2018

Ответ @andscoop хорош, но только для того, чтобы принести больше идей:

Возможное решение 1

Я делаю что-то подобное (зависимости A > B > C), и я решил подход, используя XCOM, который по умолчанию выдвигается предыдущей задачей.

Любое значение, которое возвращает метод execute, сохраняется как сообщение Xcom под ключом return_value. Мы рассмотрим эту тему позже. Источник http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/

# copy&paste it into dags/stackoverflow.py to test it

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from datetime import datetime


dag = DAG('stackoverflow', description='Another Dag',
          schedule_interval='* * * 1 1',
          start_date=datetime(2018, 6, 27), catchup=False)


def do_a(**kwargs):
    # Assuming that your TASK A is not returning a value
    return None


task_a = PythonOperator(task_id='do_a',
                        python_callable=do_a,
                        provide_context=True,
                        dag=dag)


def do_b(**kwargs):
    result_from_a = kwargs['ti'].xcom_pull(task_ids='do_a')
    if result_from_a:
        print("Continue with your second task")
    else:
        print("Send a notification somewhere, log something or stop the job here.")


task_b = PythonOperator(task_id='do_b',
                        python_callable=do_b,
                        provide_context=True,
                        dag=dag)
task_a >> task_b

введите здесь описание изображения введите здесь описание изображения

Возможное решение 2

Ветвление. Более сложным способом (и с использованием лучших практик) вы можете сделать ветвь, чтобы определить следующий шаг/задачу на основе результата t1. Я не могу сделать правильный пример сейчас, но вот 2 источника, чтобы понять, как это работает с примерами:

person skozz    schedule 27.06.2018