Создайте уникальное имя файла и получите доступ к этому файлу во всех задачах воздушного потока.

Можем ли мы создавать уникальное имя файла каждый раз, когда запускается даг воздушного потока, и получать доступ к этому файлу из всех задач? Я попытался создать глобальную переменную (output_filename) и добавить к ней метку времени. Но когда я когда-либо получаю доступ к этому имени файла в задачах, каждая задача генерирует другое имя файла, поскольку она вычисляет отметку времени в каждой задаче. Ниже приведен пример кода:

table_name = 'Test_ABC'
start_date = datetime.now()
cur_tmpstp = start_date.strftime('%Y_%m_%d')

output_filename = table_name + "_" + cur_tmpstp + ".csv"
S3_landing_path = "s3://abc/"

def clean_up():
    if os.path.exists(output_filename):
        os.remove(output_filename)


task_1 = BashOperator(
    task_id='task_1',
    bash_command="aws s3 cp %s %s/ " %(output_filename, S3_landing_path, ),
    dag=dag)

task_2_cleanup = PythonOperator(
    task_id='task_2_cleanup',
    python_callable=clean_up,
    dag=dag)

У нас есть другие задачи, в которых мы должны получить доступ к output_filename. Как мы можем получить доступ к глобальной переменной output_filename во всех задачах?


person ak86    schedule 19.05.2017    source источник


Ответы (2)


Если вам нужна только метка времени с детализацией по дням, вы можете использовать переменные по умолчанию с шаблонами. Некоторые примеры таких переменных (взяты с http://airflow.readthedocs.io/en/latest/code.html#default-variables)

{{ ds }}    the execution date as YYYY-MM-DD
{{ ds_nodash }}     the execution date as YYYYMMDD
{{ execution_date }}    the execution_date, (datetime.datetime)
person Him    schedule 14.06.2017
comment
текущая ссылка на документы — airflow.readthedocs.io/en/latest/macros.html - person El Ruso; 11.04.2019
comment
Новая текущая ссылка на документы :) airflow.apache.org/docs/ стабильно/ - person LEC; 11.08.2020

Если вам нужна отметка времени с точностью до времени, можно использовать глобальные переменные и задачу с оператором python:

DAG_NAME = 'Some DAG name'

ts = Variable.get(f"{DAG_NAME}_ts", default_var=None)

def generate_ts(*args, **kwargs):
    ts = datetime.now().isoformat()
    Variable.set(f"{DAG_NAME}_ts", ts)

generate_ts_task = PythonOperator(
    task_id='generate_ts',
    python_callable=generate_ts,
    dag=dag,
)
person Dmitrii Stebliuk    schedule 13.11.2019