Что определяет полное задание Луиджи?

Я работаю над созданием своего первого конвейера Luigi, и в настоящее время я тестирую задачи индивидуально, прежде чем строить свои зависимости. Во время тестирования я использую версию следующего основного метода для построения задачи:

if __name__ == "__main__":

    headers = dict()
    headers["Content-Type"] = "application/json"
    headers["Accept"] = "application/json"

    luigi.build[(CSVValidator(jsonfile = '/sample_input/sample_csv.json',
                docfile = None,
                error_limit = 2,
                order_fields = 3,
                output_file = 'validation_is_us.txt',
                header = headers)])

    luigi.run()

Вот как выглядит мой csv_validator:

class CSVValidator(luigi.Task):
    jsonfile = luigi.Parameter()
    docfile = luigi.Parameter()
    error_limit = luigi.Parameter()
    order_fields = luigi.Parameter()
    output_file = luigi.Parameter()
    header = luigi.DictParameter()

    def output(self):
        return luigi.LocalTarget(self.output_file + "/csv_validator_data_%s.txt" % time.time())

    def run(self):
        output_file = self.output().open('w')
        files = {}
        data = {}
        files["jsonfile"] = open(self.jsonfile, 'rb')
        files["docfile"] = open(self.docfile, 'rb')
        data["error_limit"] = self.error_limit
        data["order_fields"] = self.order_fields
        r = requests.post(*****~~~~~*****~~~~~,
                      headers=headers,
                      data=data, files=files)
        task_response = r.text.encode(encoding="UTF-8")
        print type(task_response)
        print(task_response)
        jsontaskdata = json.loads(task_response)
        json.dump(jsontaskdata, output_file)
        print("validated")
        output_file.close()

Однако эта задача никогда не запускается. Вместо этого центральный планировщик luigi утверждает, что эта задача уже выполнена:

===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 CSVValidator(...)
* 1 ran successfully:
    - 1 Downloader(...)

Этот прогресс выглядит :) потому что не было невыполненных задач или отсутствующих зависимостей

Другие созданные мной задачи, например, «Загрузчик», каждый раз успешно выполняются. Что здесь определяет полную задачу? Я не понимаю, что это значит.

Спасибо за ваше время!


person Meg Anderson    schedule 25.10.2018    source источник


Ответы (1)


Целевой объект, который возвращается методом вывода, определяет, завершена ли Задача или нет.

Объект может быть создан, если уже существует определенный выходной файл или другое условие, включая доступность внешних ресурсов. Например, в luigi.contrib.esindex.py наличие (проверка на наличие) индекса ElasticSearch в некотором удаленном кластере создаст целевой объект и сообщит вам, что задача (CopyIndex) завершена.

Вы также можете посмотреть на этот ответ: https://stackoverflow.com/a/34638943/4125622

И это обсуждение: https://github.com/spotify/luigi/issues/595

person Phil    schedule 02.01.2019