У меня есть рабочий процесс luigi, который загружает кучу больших файлов через ftp и помещает их на s3.
У меня есть одна задача, которая читает список файлов для загрузки, а затем создает кучу задач, которые фактически выполняют загрузки
Идея состоит в том, что результатом этого рабочего процесса является один файл, содержащий список успешных загрузок, при этом любые неудачные загрузки будут повторяться при следующем запуске на следующий день.
Проблема в том, что в случае сбоя какой-либо из задач загрузки список успешных загрузок никогда не создается.
Это связано с тем, что динамически создаваемые задачи становятся требованиями основной задачи, которая их создает и составляет список из их выходных данных.
Есть ли способ сделать сбои этих задач загрузки несущественными, чтобы список составлялся за вычетом результатов сбойных задач?
Пример кода ниже, GetFiles - это задача, которую мы вызываем из командной строки.
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):
def run(self):
with self.input().open('r') as fileList:
files = json.load(fileList)
tasks = []
taskOutputs = []
for file in files:
task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
tasks.append(task)
taskOutputs.append(task.output())
yield tasks
successfulDownloads = MakeSuccessfulOutputList(taskOutputs)
with self.output().open('w') as out:
json.dump(successfulDownloads, out)
def output(self):
client = S3Client()
return S3Target(path='successfulDownloads.json', client=client)