В документации, на которую вы ссылаетесь, указано, что Parallel
имеет необязательный индикатор выполнения. Это реализовано с использованием аргумента ключевого слова callback
, предоставленного multiprocessing.Pool.apply_async
:
# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1
...
class CallBack(object):
""" Callback used by parallel: it is used for progress reporting, and
to add data to be processed
"""
def __init__(self, index, parallel):
self.parallel = parallel
self.index = index
def __call__(self, out):
self.parallel.print_progress(self.index)
if self.parallel._original_iterable:
self.parallel.dispatch_next()
А вот print_progress
:
def print_progress(self, index):
elapsed_time = time.time() - self._start_time
# This is heuristic code to print only 'verbose' times a messages
# The challenge is that we may not know the queue length
if self._original_iterable:
if _verbosity_filter(index, self.verbose):
return
self._print('Done %3i jobs | elapsed: %s',
(index + 1,
short_format_time(elapsed_time),
))
else:
# We are finished dispatching
queue_length = self.n_dispatched
# We always display the first loop
if not index == 0:
# Display depending on the number of remaining items
# A message as soon as we finish dispatching, cursor is 0
cursor = (queue_length - index + 1
- self._pre_dispatch_amount)
frequency = (queue_length // self.verbose) + 1
is_last_item = (index + 1 == queue_length)
if (is_last_item or cursor % frequency):
return
remaining_time = (elapsed_time / (index + 1) *
(self.n_dispatched - index - 1.))
self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
(index + 1,
queue_length,
short_format_time(elapsed_time),
short_format_time(remaining_time),
))
То, как они это реализуют, если честно, довольно странное — похоже, предполагается, что задачи всегда будут выполняться в том порядке, в котором они были запущены. Переменная index
, ведущая к print_progress
, — это всего лишь переменная self.n_dispatched
на момент фактического запуска задания. Таким образом, первое запущенное задание всегда будет заканчиваться с index
равным 0, даже если, скажем, третье задание завершится первым. Это также означает, что они на самом деле не отслеживают количество завершенных заданий. Таким образом, вам не нужно отслеживать переменную экземпляра.
Я думаю, что вам лучше всего создать свой собственный класс CallBack и параллельный патч обезьяны:
from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed
class CallBack(object):
completed = defaultdict(int)
def __init__(self, index, parallel):
self.index = index
self.parallel = parallel
def __call__(self, index):
CallBack.completed[self.parallel] += 1
print("done with {}".format(CallBack.completed[self.parallel]))
if self.parallel._original_iterable:
self.parallel.dispatch_next()
import joblib.parallel
joblib.parallel.CallBack = CallBack
if __name__ == "__main__":
print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))
Вывод:
done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Таким образом, ваш обратный вызов вызывается всякий раз, когда задание завершается, а не по умолчанию.
person
dano
schedule
27.07.2014