Python Luigi — продолжить выполнение внешней задачи, если она удовлетворена

Я работаю над конвейером Luigi, который проверяет, существует ли файл, созданный вручную, и если да, то переходит к следующим задачам:

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
    task_namespace='MyTask'
    path = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

 class ProcessExternalFile(luigi.Task):
      task_namespace='MyTask'
      path = luigi.Parameter()

      def requires(self):
          return ExternalFileChecker(path=self.path)

      def output(self):
          dirname = self.path
          outfile = os.path.join(dirname, 'processedfile.txt')
          return luigi.LocalTarget(outfile)

      def run(self):
          #do processing

if __name__ == '__main__':
      path = r'D:\MyPath\luigi'
      luigi.run(['MyTask.ProcessExternalFile','--path', path,\
      '--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
      '--worker-keep-alive'])

Я хочу, чтобы luigi продолжал работу после того, как я создал файл руководства и вставил его в путь. Когда я это делаю, вместо того, чтобы найти файл и продолжить выполнение задачи, он перепроверяет наличие новой задачи каждые несколько секунд:

DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)

По прошествии значительного времени (примерно 15-20 минут) luigi найдет файл, после чего он сможет продолжить работу по своему усмотрению. Что я могу сделать, чтобы предотвратить эту задержку? Я хочу, чтобы Луиджи продолжил работу, как только файл появится.


person Johan    schedule 23.06.2016    source источник


Ответы (1)


Несколько вещей, о которых следует помнить:

  1. Рабочий поток Luigi не завершится, пока не будет запущена хотя бы одна задача (или, если keep_alive = True, в этом случае он завершится, когда больше не будет ожидающих выполнения задач).
  2. Существует логика повторных попыток для невыполненных задач с интервалом повторных попыток по умолчанию, равным 15 минутам.
  3. Логика повтора работает следующим образом. После указанного интервала повторных попыток планировщик забудет об ошибке задачи (аналогично нажатию кнопки «прощать ошибки» в пользовательском интерфейсе) и изменит статус задачи на ожидание. В следующий раз, когда воркер запрашивает работу у планировщика, эта задача может быть назначена воркеру.
  4. Незавершенные внешние задачи считаются FAILED с учетом логики повторных попыток.
  5. Поведение повторных попыток для внешних задач управляется параметром конфигурации retry_external_tasks в разделе [worker].

Я думаю, что вы наблюдаете что-то вроде этого. Ваш конвейер работает, задача ProcessExternalFile не выполняется, затем вы добавляете файл, задача остается НЕУДАЧНОЙ в течение retry_delay, затем, наконец, она становится ОЖИДАЮЩЕЙ, и рабочему процессу снова дается эта задача, после чего он обнаруживает файл и задачу. становится ПОЛНЫМ.

Является ли это желаемым поведением, зависит от вас. Если вы хотите, чтобы файл был найден быстрее, вы можете изменить интервал повтора. Или вы можете сделать бесконечный цикл while в методе run и периодически проверять файл, а при обнаружении выйти из цикла. Вы также можете настроить Luigi, чтобы полностью отключить логику повторных попыток.

person user443854    schedule 19.04.2018