У меня есть даг, который проверяет файлы на FTP-сервере (воздушный поток работает на отдельном сервере). Если файл (ы) существует, он (ы) перемещается в S3 (мы архивируем здесь). Оттуда имя файла передается в задание отправки Spark. Искровое задание обработает файл через S3 (искровый кластер на другом сервере). Я не уверен, нужно ли мне иметь несколько дагов, но вот поток. Что я хочу сделать, так это запустить задание Spark только в том случае, если файл существует в корзине S3.
Я попытался использовать датчик S3, но он терпит неудачу / истекает время ожидания после того, как он соответствует критериям тайм-аута, поэтому для всего dag установлено значение failed.
check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done
Я хочу запускать все только после сценария, который выполняет проверку FTP ТОЛЬКО, когда файл или файлы были перемещены в S3.