Группы обеспечения доступности баз данных Airflow, не запущенные в Google Cloud Composer: зависимости, блокирующие задачу от получения расписания

Я только что установил среду Cloud Composer на Python 3 и версию образа Composer composer-1.4.0-airflow-1.10.0. Все настройки в остальном "стоковые"; т.е. без переопределения конфигурации.

Я пытаюсь протестировать чрезвычайно простой DAG. Он работает без проблем на моем локальном сервере Airflow, но в Cloud Composer в информационном представлении задачи веб-сервера есть сообщение Dependencies Blocking Task From Getting Scheduled

Зависимости Unknown по следующей причине:

All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:
- The scheduler is down or under heavy load
- The following configuration values may be limiting the number of queueable processes: parallelism, dag_concurrency, max_active_dag_runs_per_dag, non_pooled_task_slot_count

If this task instance does not start soon please contact your Airflow administrator for assistance.

Это происходит независимо от того, запускается ли задача по расписанию или когда я вручную запускаю ее на веб-сервере (я установил все экземпляры задачи на успешное выполнение, прежде чем делать это, чтобы избежать задержек). Я пробовал сбросить планировщик в кубернетах согласно этому ответу, но задачи все еще застревают в расписании.

Кроме того, я заметил, что в моем локальном экземпляре (выполняющем сервер, рабочий процесс и планировщик в разных контейнерах Docker) столбец Hostname в представлении «Экземпляры задач» заполнен, а в Cloud Composer - нет.

Вот DAG, который я запускаю:

from datetime import datetime, timedelta
import random

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'queue': 'airflow',
    'start_date': datetime.today() - timedelta(days=2),
    'schedule_interval': None,
    'retries': 2,
    'retry_delay': timedelta(seconds=15),
    'priority_weight': 10,
}


example_dag = DAG(
    'example_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)


def always_succeed():
    pass


always_succeed_operator = PythonOperator(
    dag=example_dag,
    python_callable=always_succeed,
    task_id='always_succeed'
)


def might_fail():
    return 1 / random.randint(0, 1)


might_fail_operator = PythonOperator(
    dag=example_dag, python_callable=might_fail, task_id='might_fail'
)


might_fail_operator.set_upstream(always_succeed_operator)

person Eric Fulmer    schedule 19.12.2018    source источник


Ответы (1)


Cloud Composer не поддерживает несколько очередей сельдерея, удалите 'queue' : 'airflow' из аргументов по умолчанию. Это должно решить вашу проблему.

person Feng Lu    schedule 06.01.2019