Планировщик воздушного потока не получает задание

Я создал новый Dag со следующими аргументами:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'catchup': False,
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

dag = DAG(
    'sample_dag',
    default_args=default_args,
    description='sample dag',
    schedule_interval="44 * * * *")

Но планировщик не берет даг, когда приходит время. И он работает нормально, когда я запускаю его вручную. Есть ли что-то, что мне здесь не хватает?

Кроме того, планировщик выдавал ошибку, когда выражение cron было "*/5 * * * *"

CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.

Но выражение cron выглядит хорошо.


person pkgajulapalli    schedule 12.10.2018    source источник
comment
Каков результат airflow list_dags   -  person Meghdeep Ray    schedule 12.10.2018
comment
sample_dag является частью вывода   -  person pkgajulapalli    schedule 12.10.2018


Ответы (1)


Причина этого в том, что [time the dag runs] = start_date + schedule_interval. Поэтому, если вы установите для своего start_date что-то динамическое, то dag никогда не будет выполняться, поскольку start_date продолжает увеличиваться с... ну... временем.

Это объясняется здесь также еще один вопрос здесь в стеке, на который тоже есть ответ, они, вероятно, объясняют его лучше, чем я.

Вы должны изменить свой start_date на что-то статичное, а не datetime.now()

Если вы не хотите заполнять свой даг, вам нужно установить catchup=False в качестве параметра дага. так что-то вроде следующего:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'sample_dag',
    catchup=False,
    default_args=default_args,
    description='sample dag',
    schedule_interval="44 * * * *"
    )
person Simon D    schedule 12.10.2018
comment
Допустим, даг должен запускаться каждые 5 минут, и его не нужно заполнять... тогда как мне указать start_date как? - person pkgajulapalli; 12.10.2018
comment
Просто установите его на какое-то произвольное время в прошлом и убедитесь, что у вас есть catchup=false в качестве параметра вашего дага (а не default_args, как вы сделали). Пример здесь — › airflow.apache.org/scheduler.html#backfill-and- наверстывание - person Simon D; 12.10.2018