Может ли планировщик задач Python Луиджи обнаруживать косвенные зависимости?

Краткая версия:

Есть ли в Python планировщик задач, который может делать то, что делает gmake? В частности, мне нужен планировщик задач, рекурсивно разрешающий зависимости. Я посмотрел на Луиджи, но, похоже, он разрешил только прямые зависимости.

Полная версия:

Я пытаюсь создать рабочий процесс, который обрабатывает множество файлов данных в заранее определенном порядке, более поздние задачи могут напрямую зависеть от результатов некоторых более ранних задач, но, в свою очередь, правильность этих результатов зависит от еще более ранних задач. .

Например, давайте рассмотрим карту зависимостей, подобную следующей:

A <- B <- C

Когда я запрашиваю результат из задачи C, Луиджи автоматически планирует B, а затем, поскольку B зависит от A, он будет планировать A. Таким образом, окончательный порядок выполнения будет [A, B, C]. Каждая задача будет создавать официальный выходной файл как признак успешного выполнения. Это нормально для первого запуска.

Теперь предположим, что я ошибся во входных данных для задачи A. По-видимому, мне нужно заново запустить всю цепочку. Однако просто удалить выходной файл из A не получится. Потому что Луиджи видит выходные данные B и C и приходит к выводу, что требование для задачи C выполнено и запусков не требуется. Мне нужно удалить выходные файлы из ВСЕХ задач, которые зависят от A, чтобы они были запущены снова. В простом случае мне нужно удалить все выходные файлы из A, B и C, чтобы Луиджи обнаружил изменение, внесенное в A.

Это очень неудобная функция. Если у меня есть десятки или сотни задач, которые имеют довольно сложные зависимости друг от друга, будет действительно сложно сказать, какие из них будут затронуты, когда одну из задач нужно будет повторно запустить. Для планировщика задач, обладающего способностью разрешать зависимости, я ожидаю, что Луиджи сможет действовать аналогично GNU-Make, где зависимости проверяются рекурсивно, а конечная цель будет перестроена при изменении одного из самых глубоких исходных файлов.

Мне было интересно, может ли кто-нибудь дать несколько предложений по этому вопросу. Мне не хватает некоторых ключевых функций в Luigi? Существуют ли другие планировщики задач, действующие как gmake? Меня особенно интересуют пакеты на основе Python, и я бы предпочел, чтобы они поддерживали Windows.

Большое спасибо!


person Lei    schedule 26.02.2017    source источник


Ответы (2)


Это кажется возможным, если переопределить полный метод для ваших задач. Вам придется применять это на всем протяжении графа зависимостей.

def complete(self):
    outputs = self.flatten(self.output())
    if not all(map(lambda output: output.exists(), outputs)):
        return False
    for task in self.flatten(self.requires()):
        if not task.complete():
            for output in outputs:
                if output.exists():
                    output.remove()
            return False
    return True
person MattMcKnight    schedule 26.02.2017
comment
Привет, @MattMcKnight! Большое спасибо за указание на полный метод! Я попробую это решение. Однако мне интересно, почему Луиджи не делает этого по умолчанию? Использование цикла Python for для ручного разрешения зависимостей может быть не очень эффективным, и обычный пользователь, такой как я, может делать ошибки и / или делать что-то неоптимально. Но в любом случае большое спасибо! :) - person Lei; 27.02.2017
comment
Для работы приведенного выше кода необходимо внести несколько исправлений. Во-первых, метод flatten определен Луиджи, поэтому его нужно называть как self.flatten. Во-вторых, я счел лучше повторно использовать метод complete по умолчанию в конце, чтобы проверить статус самой задачи, поэтому я написал return super(MyTask, self).complete(), предполагая, что унаследованный класс называется MyTask. Однако я все еще получаю сообщение об ошибке «Файл уже существует» при выполнении задач нисходящего потока, потому что их выходные файлы не удаляются. Есть идеи, как автоматически обновлять эти выходные файлы? - person Lei; 27.02.2017
comment
Извиняюсь за то, что не попробовал код, просто хотел дать общее представление о подходе. Я полагаю, что помимо возврата False из полного, нужно было бы удалить существующий вывод. Метод Task.complete () не так уж и сложен. github.com/spotify/luigi/blob/master/luigi/ task.py # L526 - person MattMcKnight; 27.02.2017
comment
Было несколько запросов на аналогичные процессы очистки. github.com/spotify/luigi/issues/595#issuecomment-194323344 - person MattMcKnight; 27.02.2017
comment
Привет, @MattMcKnight, спасибо за обновление! Я обнаружил, что у Luigi LocalTarget есть метод удаления, который можно использовать для удаления файла. Итак, вместо использования метода os.remove я вызову output.remove (). Я приму этот ответ и опубликую свою последнюю протестированную версию. Большое спасибо за Вашу помощь! :) - person Lei; 28.02.2017
comment
Привет, @MattMcKnight, я задал этот вопрос о репозитории Луиджи на GitHub, и кто-то поднял вопрос эффективности. Как и в ответе прямо сейчас, выходы текущей задачи проверяются последними после того, как были проверены все требуемые задачи. Это может быть неоптимальным. Не могли бы вы изменить ответ, чтобы сначала проверить текущий уровень вывода, а затем выполнить поиск зависимостей более высокого уровня? Кроме того, было бы неплохо, если бы вы могли обернуть метод целым унаследованным классом и упомянуть, что все конкретные классы задач могут быть в дальнейшем унаследованы от этого модифицированного базового класса. - person Lei; 02.03.2017
comment
Кстати, я подумал, есть ли способ сканировать пул задач только один раз и назначать флаг каждой задаче. Таким образом, когда вызывается его метод complete, статус можно легко определить без вызова метода complete всех более глубоких задач. Прямо сейчас ответ имеет эффективность наихудшего случая ~ O (n!), Что может быть очень медленным, когда n велико. Я подумаю об этом подробнее. Спасибо! - person Lei; 02.03.2017
comment
Спасибо! Я принял ответ! Спасибо вам за помощь! :) - person Lei; 04.03.2017

На самом деле это неудобно, и d6tflow проверяет полноту всех восходящих зависимостей, а не только наличие вывода для TaskC. Если вы сбросите TaskA, TaskC также будет неполным и автоматически перезапустится.

# reset TaskA => makes TaskC incomplete
TaskA().invalidate() 
d6tflow.preview(TaskC()) # all tasks pending

Для получения дополнительных сведений см. Полный пример ниже и d6tflow docs.

import d6tflow
import pandas as pd

class TaskA(d6tflow.tasks.TaskCachePandas):  # save dataframe in memory

    def run(self):        
        self.save(pd.DataFrame({'a':range(10)})) # quickly save dataframe

class TaskB(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return TaskA() # define dependency

    def run(self):
        df = self.input().load() # quickly load required data
        df = df*2
        self.save(df)

class TaskC(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return TaskB()

    def run(self):
        df = self.input().load() 
        df = df*2
        self.save(df)

# Check task dependencies and their execution status
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
   └─--[TaskB-{} (PENDING)]
      └─--[TaskA-{} (PENDING)]
'''

# Execute the model training task including dependencies
d6tflow.run(TaskC())

'''
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 TaskA()
    - 1 TaskB()
    - 1 TaskC()
'''

# all tasks complete
d6tflow.preview(TaskC())

'''
└─--[TaskC-{} (COMPLETE)]
   └─--[TaskB-{} (COMPLETE)]
      └─--[TaskA-{} (COMPLETE)]
'''

# reset TaskA => makes TaskC incomplete
TaskA().invalidate() 
d6tflow.preview(TaskC())
'''
└─--[TaskC-{} (PENDING)]
   └─--[TaskB-{} (PENDING)]
      └─--[TaskA-{} (PENDING)]
'''
person citynorman    schedule 21.02.2019
comment
Не могли бы вы вкратце прокомментировать эффективность алгоритма, используемого библиотекой для разрешения рекурсивных зависимостей? Каков типичный размер пула задач, который эта библиотека может обрабатывать без значительной / заметной задержки? - person Lei; 27.03.2019
comment
Не уверен, на практике это никогда не было проблемой, и я построил с ее помощью крупномасштабные системы машинного обучения. Работа происходит внутри задач, библиотека только координирует. Один пользователь обучил модели 20K ML. - person citynorman; 31.12.2019