Отслеживание хода выполнения joblib.Parallel

Есть ли простой способ отслеживать общий ход выполнения joblib.Parallel?

У меня есть длительное выполнение, состоящее из тысяч заданий, которые я хочу отслеживать и записывать в базу данных. Однако для этого всякий раз, когда Parallel завершает задачу, мне нужно, чтобы он выполнял обратный вызов, сообщая, сколько оставшихся заданий осталось.

Я уже выполнял аналогичную задачу с Python stdlib multiprocessing.Pool, запустив поток, который записывает количество ожидающих заданий в списке заданий пула.

Глядя на код, Parallel наследует пул, поэтому я подумал, что смогу провернуть тот же трюк, но, похоже, он не использует этот список, и я не смог понять, как еще «читать» его внутренний статус любым другим способом.


person Cerin    schedule 27.07.2014    source источник


Ответы (8)


Почему нельзя просто использовать tqdm? Следующее сработало для меня

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import tqdm

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
100%|██████████| 1000/1000 [00:00<00:00, 10563.37it/s]
person Jon    schedule 20.04.2018
comment
Очень аккуратный. Спасибо. - person Cerin; 28.01.2019
comment
Я не думаю, что на самом деле это отслеживает выполнение запущенных заданий, а просто ставит в очередь задания. Если бы вы вставили time.sleep(1) в начале myfun, вы бы обнаружили, что прогресс tqdm завершается почти мгновенно, но для заполнения results требуется еще несколько секунд. - person Noah; 01.02.2019
comment
Да, это отчасти правильно. Он отслеживает запуск задания по сравнению с завершением, но другая проблема заключается в том, что после завершения всех заданий также возникает задержка, вызванная накладными расходами. После завершения всех задач необходимо собрать результаты, и это может занять некоторое время. - person Jon; 01.02.2019
comment
Я считаю, что этот ответ на самом деле не отвечает на вопрос. Как уже упоминалось, при таком подходе будет отслеживаться очередь, а не само выполнение. Подход с обратным вызовом, показанный ниже, кажется более точным в отношении вопроса. - person devforfu; 22.05.2019
comment
@devforfu да, об этом говорилось в предыдущем комментарии. - person Jon; 23.05.2019
comment
Этот ответ неверен, так как он не отвечает на вопрос. Этот ответ не должен быть принят. - person Henry Henrinson; 11.07.2020
comment
ответ frenzykryger ниже содержит отличное решение проблемы этого ответа. - person Christian Steinmeyer; 20.08.2020
comment
Это не правильно. Он подсчитывает только время начала работы, которое происходит немедленно. - person anilbey; 25.03.2021

Еще один шаг вперед по сравнению с ответами Дано и Коннора — это обернуть все это как контекстный менеджер:

import contextlib
import joblib
from tqdm import tqdm    
from joblib import Parallel, delayed

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""
    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)

        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()    

Затем вы можете использовать его так и не оставлять исправленный код обезьяны после того, как вы это сделали:

with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))

Я думаю, что это здорово, и это похоже на интеграцию tqdm pandas.

person featuredpeow    schedule 19.11.2019
comment
Это должен быть лучший ответ! Спасибо - person TheDimLebowski; 07.02.2020
comment
Отличное решение. Протестировано с joblib 0.14.1 и tqdm 4.41.0 - отлично работает. Это было бы отличным дополнением к tqdm! - person dennisobrien; 02.04.2020
comment
Я не могу его отредактировать, но небольшая опечатка в решении, где joblib.parallel.BatchCompletionCallback на самом деле является BatchCompletionCallBack (обратите внимание на верблюжий случай в CallBack) - person Andrew; 15.06.2020

В документации, на которую вы ссылаетесь, указано, что Parallel имеет необязательный индикатор выполнения. Это реализовано с использованием аргумента ключевого слова callback, предоставленного multiprocessing.Pool.apply_async:

# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1

...

class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    """
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        self.parallel.print_progress(self.index)
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

А вот print_progress:

def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
            return
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
                     short_format_time(elapsed_time),
                    ))
    else:
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
                return
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,
                     queue_length,
                     short_format_time(elapsed_time),
                     short_format_time(remaining_time),
                    ))

То, как они это реализуют, если честно, довольно странное — похоже, предполагается, что задачи всегда будут выполняться в том порядке, в котором они были запущены. Переменная index, ведущая к print_progress, — это всего лишь переменная self.n_dispatched на момент фактического запуска задания. Таким образом, первое запущенное задание всегда будет заканчиваться с index равным 0, даже если, скажем, третье задание завершится первым. Это также означает, что они на самом деле не отслеживают количество завершенных заданий. Таким образом, вам не нужно отслеживать переменную экземпляра.

Я думаю, что вам лучше всего создать свой собственный класс CallBack и параллельный патч обезьяны:

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))

Вывод:

done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

Таким образом, ваш обратный вызов вызывается всякий раз, когда задание завершается, а не по умолчанию.

person dano    schedule 27.07.2014
comment
Отличное исследование, спасибо. Я не заметил атрибут обратного вызова. - person Cerin; 29.07.2014
comment
Я обнаружил, что документация joblib очень ограничена. Мне нужно покопаться в исходном коде этого класса CallBack. Мой вопрос: могу ли я настроить аргументы при вызове __call__? (подкласс всего класса Parallel может быть одним из способов, но для меня это тяжело). - person ziyuang; 12.02.2015

Расширение ответа Дано для новейшей версии библиотеки joblib. Во внутренней реализации было несколько изменений.

from joblib import Parallel, delayed
from collections import defaultdict

# patch joblib progress callback
class BatchCompletionCallBack(object):
  completed = defaultdict(int)

  def __init__(self, time, index, parallel):
    self.index = index
    self.parallel = parallel

  def __call__(self, index):
    BatchCompletionCallBack.completed[self.parallel] += 1
    print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
    if self.parallel._original_iterator is not None:
      self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
person Connor Clark    schedule 23.01.2017

Индикатор выполнения текста

Еще один вариант для тех, кому нужен текстовый прогрессбар без дополнительных модулей типа tqdm. Актуально для joblib=0.11, python 3.5.2 на linux на 16.04.2018 и показывает ход выполнения подзадачи.

Переопределить собственный класс:

class BatchCompletionCallBack(object):
    # Added code - start
    global total_n_jobs
    # Added code - end
    def __init__(self, dispatch_timestamp, batch_size, parallel):
        self.dispatch_timestamp = dispatch_timestamp
        self.batch_size = batch_size
        self.parallel = parallel

    def __call__(self, out):
        self.parallel.n_completed_tasks += self.batch_size
        this_batch_duration = time.time() - self.dispatch_timestamp

        self.parallel._backend.batch_completed(self.batch_size,
                                           this_batch_duration)
        self.parallel.print_progress()
        # Added code - start
        progress = self.parallel.n_completed_tasks / total_n_jobs
        print(
            "\rProgress: [{0:50s}] {1:.1f}%".format('#' * int(progress * 50), progress*100)
            , end="", flush=True)
        if self.parallel.n_completed_tasks == total_n_jobs:
            print('\n')
        # Added code - end
        if self.parallel._original_iterator is not None:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack

Определите глобальную константу перед использованием с общим количеством заданий:

total_n_jobs = 10

Это приведет к чему-то вроде этого:

Progress: [########################################          ] 80.0%
person Nikolay    schedule 16.04.2018
comment
Работает отлично. Если вы также хотите распечатать оценку времени, вы можете изменить __call__ следующим образом: ``` time_remaining = (this_batch_duration / self.batch_size) * (total_n_jobs - self.parallel.n_completed_tasks) print( \rProgress: [{0:50s} ] {1:.1f}% est {2:1f}мин осталось.format('#' * int(прогресс * 50), прогресс*100, время_оставшееся/60) , end=, flush=True) ``` - person lawrencegripper; 11.07.2019

Вот еще один ответ на ваш вопрос со следующим синтаксисом:

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

https://stackoverflow.com/a/40415477/232371

person Ben Usman    schedule 04.11.2016

Решение TLDR:

Работает с joblib 0.14.0 и tqdm 4.46.0 с использованием Python 3.5. Спасибо frenzykryger за предложения contextlib, dano и Connor за идею исправления обезьян.

import contextlib
import joblib
from tqdm import tqdm
from joblib import Parallel, delayed

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""

    def tqdm_print_progress(self):
        if self.n_completed_tasks > tqdm_object.n:
            n_completed = self.n_completed_tasks - tqdm_object.n
            tqdm_object.update(n=n_completed)

    original_print_progress = joblib.parallel.Parallel.print_progress
    joblib.parallel.Parallel.print_progress = tqdm_print_progress

    try:
        yield tqdm_object
    finally:
        joblib.parallel.Parallel.print_progress = original_print_progress
        tqdm_object.close()

Вы можете использовать это так же, как описано frenzykryger

import time
def some_method(wait_time):
    time.sleep(wait_time)

with tqdm_joblib(tqdm(desc="My method", total=10)) as progress_bar:
    Parallel(n_jobs=2)(delayed(some_method)(0.2) for i in range(10))

Подробное объяснение:

Решение Джона просто реализовать, но оно измеряет только отправленную задачу. Если задача занимает много времени, полоса застрянет на 100%, ожидая завершения выполнения последней отправленной задачи.

Подход менеджера контекста от frenzykryger, улучшенный от dano и Connor, лучше, но BatchCompletionCallBack также можно вызвать с ImmediateResult до завершения задачи (см. Промежуточные результаты из joblib). Это даст нам счет, превышающий 100%.

Вместо того, чтобы обезьяна исправляла BatchCompletionCallBack, мы можем просто исправить функцию print_progress в Parallel. BatchCompletionCallBack все равно уже называет это print_progress. Если установлено подробное описание (например, Parallel(n_jobs=2, verbose=100)), print_progress будет распечатывать завершенные задачи, хотя и не так хорошо, как tqdm. Глядя на код, print_progress — это метод класса, поэтому в нем уже есть self.n_completed_tasks, который регистрирует нужное нам число. Все, что нам нужно сделать, это просто сравнить это с текущим состоянием прогресса joblib и обновить, только если есть разница.

Это было протестировано в joblib 0.14.0 и tqdm 4.46.0 с использованием python 3.5.

person Magdrop    schedule 08.05.2020

В Jupyter tqdm каждый раз запускает новую строку в выводе. Итак, для Jupyter Notebook это будет:

Для использования в блокноте Jupyter. Нет сна:

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import notebook

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in notebook.tqdm(range(1000)))  

100% 1000/1000 [00:06<00:00, 143.70it/s]

С временем сна:

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import notebook
from random import randint
import time

def myfun(x):
    time.sleep(randint(1, 5))
    return x**2

results = Parallel(n_jobs=7)(delayed(myfun)(i) for i in notebook.tqdm(range(100)))

Что я сейчас использую вместо joblib.Parallel:

import concurrent.futures
from tqdm import notebook
from random import randint
import time

iterable = [i for i in range(50)]

def myfun(x):
    time.sleep(randint(1, 5))
    return x**2

def run(func, iterable, max_workers=8):
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(notebook.tqdm(executor.map(func, iterable), total=len(iterable)))
    return results

run(myfun, iterable)
person Дмитро Олександ&    schedule 31.05.2019
comment
Неправильно, это подсчитывает только время запуска задания, которое будет немедленным, независимо от того, какую функцию вы оборачиваете. - person anilbey; 25.03.2021
comment
Как это может быть неправильно, если это из официальной документации? joblib.readthedocs.io/en/latest Ctrl+F для параллельного (n_jobs=1) И мой ответ был о запуске tqdm в блокноте Jupyter. Он почти такой же, как и принятый. Разница лишь в том, что он предназначен для использования в блокноте Jupyter. - person Дмитро Олександ&; 07.04.2021
comment
Думаю, я понял. Похоже, ты прав. - person Дмитро Олександ&; 07.04.2021
comment
Однако в блокноте Jupyter это происходит не мгновенно. Например, 14% 14/100 [00:05‹00:31, 2,77 ит/с] Требуется время для завершения со случайным временем сна. - person Дмитро Олександ&; 07.04.2021