Как успешно выйти из задачи на полпути в даге Airflow?

У меня есть даг, который проверяет файлы на FTP-сервере (воздушный поток работает на отдельном сервере). Если файл (ы) существует, он (ы) перемещается в S3 (мы архивируем здесь). Оттуда имя файла передается в задание отправки Spark. Искровое задание обработает файл через S3 (искровый кластер на другом сервере). Я не уверен, нужно ли мне иметь несколько дагов, но вот поток. Что я хочу сделать, так это запустить задание Spark только в том случае, если файл существует в корзине S3.

Я попытался использовать датчик S3, но он терпит неудачу / истекает время ожидания после того, как он соответствует критериям тайм-аута, поэтому для всего dag установлено значение failed.

check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done

Я хочу запускать все только после сценария, который выполняет проверку FTP ТОЛЬКО, когда файл или файлы были перемещены в S3.


person sdot257    schedule 24.06.2017    source источник


Ответы (2)


У вас может быть 2 разных группы DAG. У одного есть только датчик S3, и он продолжает работать, скажем, каждые 5 минут. Если он находит файл, запускается второй DAG. Второй DAG отправляет файл на S3 и архивирует, если это сделано. Вы можете использовать TriggerDagRunOperator в первом DAG для запуска.

person Him    schedule 26.06.2017
comment
Что произойдет, если он не найдет файл, не выйдет ли он с кодом ошибки? Следовательно, кому-то нужно будет перезапустить работу, не так ли? - person sdot257; 26.06.2017
comment
Первый DAG (который имеет две задачи, S3Sensor и TriggerDagRunOperator) можно запланировать для запуска каждые пять минут. Это означает, что датчик будет запускаться каждые 5 минут, и, если он находит файл, запускает второй DAG. В противном случае он ничего не делает и запускается повторно через 5 минут. Не имеет значения, выходит ли он с кодом ошибки (вы НЕ должны устанавливать для параметра independent_on_past значение true для первого DAG). - person Him; 26.06.2017

Ответ, который Он дал, сработает. Другой вариант - использовать параметр soft_fail, который есть у датчиков (это параметр от BaseSensorOperator). ЕСЛИ вы установите для этого параметра значение True, вместо сбоя задачи она пропустит ее, и все последующие задачи в ветке также будут пропущены.

См. код воздушного потока для больше информации.

person J.Brouwers    schedule 22.02.2018