Сделайте сбой динамической задачи Луиджи некритичным

У меня есть рабочий процесс luigi, который загружает кучу больших файлов через ftp и помещает их на s3.

У меня есть одна задача, которая читает список файлов для загрузки, а затем создает кучу задач, которые фактически выполняют загрузки

Идея состоит в том, что результатом этого рабочего процесса является один файл, содержащий список успешных загрузок, при этом любые неудачные загрузки будут повторяться при следующем запуске на следующий день.

Проблема в том, что в случае сбоя какой-либо из задач загрузки список успешных загрузок никогда не создается.

Это связано с тем, что динамически создаваемые задачи становятся требованиями основной задачи, которая их создает и составляет список из их выходных данных.

Есть ли способ сделать сбои этих задач загрузки несущественными, чтобы список составлялся за вычетом результатов сбойных задач?

Пример кода ниже, GetFiles - это задача, которую мы вызываем из командной строки.

class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()

def run(self):
    with self.output().open('w') as output:
        WriteFileFromFtp(sourceUrl, output)

def output(self):
    client = S3Client()
    return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)


@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):

def run(self):

    with self.input().open('r') as fileList:
        files = json.load(fileList)

        tasks = []
        taskOutputs = []

        for file in files:
            task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
            tasks.append(task)
            taskOutputs.append(task.output())

        yield tasks

        successfulDownloads = MakeSuccessfulOutputList(taskOutputs)

    with self.output().open('w') as out:
        json.dump(successfulDownloads, out)

def output(self):
    client = S3Client()
    return S3Target(path='successfulDownloads.json', client=client)

person Jonny Nobody    schedule 08.01.2018    source источник


Ответы (2)


ЭТОТ ОТВЕТ, ВЕРОЯТНО НЕПРАВИЛЬНЫЙ - ПРОВЕРЬТЕ КОММЕНТАРИИ

Я читал документацию несколько раз и не обнаружил никаких указаний на такие вещи, как некритические сбои. При этом такого поведения можно легко добиться, переопределив _1 / a> в DownloadFileFromFtp, но при этом можно использовать DownloadFileFromFtp.output в GetFiles.run.

При замене с помощью return True задача DownloadFileFromFtp будет выполнена успешно независимо от успешности загрузки.

class DownloadFileFromFtp(luigi.Task):
    sourceUrl = luigi.Parameter()

    def run(self):
        with self.output().open('w') as output:
            WriteFileFromFtp(sourceUrl, output)

    def output(self):
        client = S3Client()
        return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

    def complete(self,):
        return True

Однако обратите внимание, что вы также можете использовать более сложную логику в этом complete методе - например, сбой, только если задача встретила конкретный сетевой сбой во время выполнения.

person Daniel    schedule 23.01.2018
comment
Это предотвратит запуск задачи, даже если она никогда не запускалась. Когда задача запланирована, luigi сначала проверяет, завершены ли ее зависимости, вызывая complete для каждой зависимости, указанной в requires. Если complete возвращает True для зависимости, эта зависимость не запускается. - person Charles Langlois; 10.04.2019
comment
Единственный способ, который я вижу для реализации таких необязательных зависимостей, - это убедиться, что необязательная задача никогда не завершается с ошибкой, перехватывая все исключения в методе run (вы все равно, вероятно, захотите зарегистрировать эти исключения или обозначить сбой каким-либо образом в target, и, возможно, использовать цель, которая знает об ошибках и возвращает False из своего exists метода, если задача не удалась). - person Charles Langlois; 10.04.2019
comment
Я не знал об этом, это поведение где-то задокументировано? В любом случае, это, вероятно, можно было бы исправить, если бы метод run всегда устанавливал логическое значение экземпляра как True, а затем метод complete для возврата этого значения, указывая, был ли метод run уже вызван или нет. Я попробую это позже, и отредактирую свой awnser, если это так. - person Daniel; 10.04.2019
comment
Установка переменной в памяти для отслеживания того, была ли запущена задача, будет работать, пока вы не попытаетесь использовать несколько рабочих Luigi. Поскольку рабочие не разделяют память, каждый из них будет пытаться выполнить задачу. Луиджи ожидает, что несколько рабочих будут координировать свою работу, проверив цель, к которой они все могут получить доступ, например, файл или базу данных. - person matmat; 15.10.2019
comment
@matmat Конечно, в этом есть смысл. Я отредактировал ответ, чтобы добавить отказ от ответственности - person Daniel; 15.10.2019

Несколько лет спустя вы, должно быть, нашли ответ, но вот кое-что, что может помочь.

class DownloadFileFromFtp(luigi.Task):
      sourceUrl = luigi.Parameter()

      def run(self):
           with self.output().open('w') as output:
             WriteFileFromFtp(sourceUrl, output)
      
      def on_failure(self, exception):
          #If the task fails for any reason, 
          #then just indicate the task as completed.
          #From the docs, exception is a string, so you can easily.

          if "FileNotFound" in exception:
              return self.complete(ignore=True)
          return self.complete(ignore=False)

      def complete(self, ignore=False):
          return ignore

      def output(self):
          client = S3Client()
          return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
person John Kitonyo    schedule 25.03.2021