очистить восходящую задачу в воздушном потоке внутри dag

у меня задача в даге воздушного потока. у него есть три дочерних задачи. К сожалению, бывают случаи, когда эта родительская задача завершается успешно, но два из трех дочерних элементов завершаются неудачно (и повторная попытка для дочерних элементов их не исправит).

он требует, чтобы родитель повторил попытку (даже если она не потерпела неудачу).

поэтому я покорно перехожу в графическое представление запуска dag и «очищаю» эту родительскую задачу и все последующие задачи (+ рекурсивные).

есть ли способ сделать это внутри самого дага?


person yee379    schedule 17.12.2017    source источник


Ответы (4)


Если ваши задачи являются частью вложенного тега, вызов dag.clear() в on_retry_callback SubDagOperator должен помочь:

SubDagOperator(
    subdag=subdag,
    task_id="...",
    on_retry_callback=lambda context: subdag.clear(
        start_date=context['execution_date'],
        end_date=context['execution_date']),
    dag=dag
)
person Christoph Hösler    schedule 21.03.2018

У нас была аналогичная проблема, которую мы решили, поместив задачу с зависимостями, которые мы хотим повторить, во вспомогательный тег. Затем, когда вспомогательный тег повторяет попытку, мы очищаем состояние задач вспомогательного тега с помощью on_retry_callback, чтобы все они снова запускались.

sub_dag = SubDagOperator(
    retry_delay=timedelta(seconds=30),
    subdag=create_sub_dag(),
    on_retry_callback=callback_subdag_clear,
    task_id=sub_dag_name,
    dag=dag,
)

def callback_subdag_clear(context):
    """Clears a sub-dag's tasks on retry."""
    dag_id = "{}.{}".format(
        context['dag'].dag_id,
        context['ti'].task_id,
    )
    execution_date = context['execution_date']
    sub_dag = DagBag().get_dag(dag_id)
    sub_dag.clear(
        start_date=execution_date,
        end_date=execution_date,
        only_failed=False,
        only_running=False,
        confirm_prompt=False,
        include_subdags=False
    )

(изначально взято отсюда https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117)

person randal25    schedule 03.08.2018

Это не дает прямого ответа на ваш вопрос, но я могу предложить лучший обходной путь:

default_args = {
    'start_date': datetime(2017, 12, 16),
    'depends_on_past': True,
}

dag = DAG(
    dag_id='main_dag',
    schedule_interval='@daily',
    default_args=default_args,
    max_active_runs=1,
    retries=100,
    retry_delay= timedelta(seconds=120)
)

Установите depends_on_past в значение True в DAG.

Затем в задачах этого дага ограничьте повторные попытки с помощью повторных попыток.

  DummyOperator(
        task_id='bar',
        retries=0
        dag=child)

Таким образом, группа доступности базы данных помечается как сбойная при сбое любой задачи. Затем DAG будет повторен.

person x97Core    schedule 25.12.2017
comment
Насколько я могу судить, это не работает. Согласно документации, depends_on_past будет блокировать последующие запросы задач до тех пор, пока текущая задача не будет завершена. - person jasonthomas; 03.04.2018

Мы выбрали метод clear_task_instances из taskinstance:

@provide_session
def clear_tasks_fn(tis,session=None,activate_dag_runs=False,dag=None) -> None:
     """
     Wrapper for `clear_task_instances` to be used in callback function
     (that accepts only `context`)
    """

    taskinstance.clear_task_instances(tis=tis,
                                      session=session,
                                      activate_dag_runs=activate_dag_runs,
                                      dag=dag)

def clear_tasks_callback(context) -> None:
    """
    Clears tasks based on list passed as `task_ids_to_clear` parameter

    To be used as `on_retry_callback`
    """

    all_tasks = context["dag_run"].get_task_instances()
    dag = context["dag"]
    task_ids_to_clear = context["params"].get("task_ids_to_clear", [])

    tasks_to_clear = [ ti for ti in all_tasks if ti.task_id in task_ids_to_clear ]

    clear_tasks_fn(tasks_to_clear,
               dag=dag)

Вам нужно будет предоставить список задач, которые вы хотите очистить при обратном вызове, например, для любой дочерней задачи:

DummyOperator('some_child',
              on_retry_callback=clear_tasks_callback,
              params=dict(
                  task_ids_to_clear=['some_child', 'parent']
              ),
              retries=1
)
person Yoni M    schedule 16.08.2020