DASK - остановка воркеров во время выполнения приводит к тому, что завершенные задачи запускаются дважды

Я хочу использовать dask для обработки около 5000 пакетных задач, которые сохраняют свои результаты в реляционной базе данных, и после того, как все они будут завершены, я хочу запустить финальную задачу, которая запросит базу данных и сгенерирует файл результатов (который будет храниться в AWS. S3)

Так что это примерно так:

from dask import bag, delayed batches = bag.from_sequence(my_batches()) results = batches.map(process_batch_and_store_results_in_database) graph = delayed(read_database_and_store_bundled_result_into_s3)(results) client = Client('the_scheduler:8786') client.compute(graph)

И это работает, но: Ближе к концу обработки многие рабочие процессы простаивают, и я хотел бы иметь возможность отключить их (и сэкономить немного денег на AWS EC2), но если я это сделаю, планировщик «забудет» это эти задачи уже выполнены, и попробуйте запустить их снова на оставшихся рабочих процессах.

Я понимаю, что это на самом деле функция, а не ошибка, поскольку Dask пытается отслеживать все результаты перед запуском read_database_and_store_bundled_result_into_s3, но: Есть ли способ, которым я могу сказать dask, чтобы он просто организовал график распределенной обработки и не беспокоился о государственное управление?


person Tony Lâmpada    schedule 19.05.2017    source источник
comment
Кажется, эта проблема как-то связана с этим: github.com/dask/distributed/issues/847   -  person Tony Lâmpada    schedule 19.05.2017
comment
Также по теме: stackoverflow.com/questions/41965253/   -  person Tony Lâmpada    schedule 20.05.2017


Ответы (1)


Я рекомендую вам просто забыть о фьючерсах после их завершения. Это решение использует интерфейс dask.distributed concurrent.futures, а не dask.bag. В частности, он использует итератор as_completed.

from dask.distributed import Client, as_completed
client = Client('the_scheduler:8786')

futures = client.map(process_batch_and_store_results_in_database, my_batches())

seq = as_completed(futures)
del futures # now only reference to the futures is within seq

for future in seq:
    pass  # let future be garbage collected
person MRocklin    schedule 20.05.2017