AWS MWAA — Воздушный поток — Соединения нагрузки

Я рассматриваю возможность переноса Airflow с ECS на MWAA, что, по-видимому, хорошо работает. Но с точки зрения CICD есть некоторые ограничения на загрузку соединений. Основываясь на документации, мы не можем устанавливать соединения с помощью командной строки, основываясь на том, что здесь написано: https://docs.aws.amazon.com/mwaa/latest/userguide/access-airflow-ui.html#call-mwaa-apis-cli

Я знаю, что мы можем сделать это с помощью пользовательского интерфейса, но это не так. Кто-нибудь знает, как с этим бороться или есть ли способ сделать это автоматически?

Большое спасибо, Ксави.


person xavy    schedule 03.03.2021    source источник


Ответы (1)


Вы можете добавить шаг в DAG, который программно добавит соединение, если оно еще не присутствует в среде:

def add_connection_callable(**kwargs):
    connection: Connection = Connection(
        conn_id="foo",
        conn_type="HTTP",
        host=kwargs['host'],
        port=kwargs['port']
    )
    session: Session = settings.Session
    db_connection: Connection = session.query(Connection) \
        .filter(Connection.conn_id == "foo") \
        .first()
    if db_connection is None:
        logging.info("Adding connection \"foo\"..")
        session.add(connection)
        session.commit()
    else:
        logging.info("Connection \"foo\" already exists.")

add_connection: PythonOperator = PythonOperator(
    task_id="add_connection",
    python_callable=add_connection_callable,
    op_kwargs={
        "host": "{{ dag_run.conf['Host'] }}",
        "port": "{{ dag_run.conf['Port'] }}"
    },
    dag=dag)
person Hedi Bejaoui    schedule 07.03.2021