Я работаю над конвейером Luigi, который проверяет, существует ли файл, созданный вручную, и если да, то переходит к следующим задачам:
import luigi, os
class ExternalFileChecker(luigi.ExternalTask):
task_namespace='MyTask'
path = luigi.Parameter()
def output(self):
return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))
class ProcessExternalFile(luigi.Task):
task_namespace='MyTask'
path = luigi.Parameter()
def requires(self):
return ExternalFileChecker(path=self.path)
def output(self):
dirname = self.path
outfile = os.path.join(dirname, 'processedfile.txt')
return luigi.LocalTarget(outfile)
def run(self):
#do processing
if __name__ == '__main__':
path = r'D:\MyPath\luigi'
luigi.run(['MyTask.ProcessExternalFile','--path', path,\
'--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
'--worker-keep-alive'])
Я хочу, чтобы luigi продолжал работу после того, как я создал файл руководства и вставил его в путь. Когда я это делаю, вместо того, чтобы найти файл и продолжить выполнение задачи, он перепроверяет наличие новой задачи каждые несколько секунд:
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)
По прошествии значительного времени (примерно 15-20 минут) luigi найдет файл, после чего он сможет продолжить работу по своему усмотрению. Что я могу сделать, чтобы предотвратить эту задержку? Я хочу, чтобы Луиджи продолжил работу, как только файл появится.