ThreadPoolExecutor внутри ProcessPoolExecutor

Я новичок в модуле Futures, и у меня есть задача, которая может быть полезна распараллеливание; но я, кажется, не могу точно понять, как настроить функцию для потока и функцию для процесса. Я был бы признателен за любую помощь, которую кто-либо может пролить по этому вопросу.

Я запускаю оптимизацию роя частиц (PSO). Не вдаваясь в подробности самого PSO, вот базовая структура моего кода:

Существует класс Particle с методом getFitness(self) (который вычисляет некоторую метрику и сохраняет ее в self.fitness). Симуляция PSO имеет несколько экземпляров частиц (легко более 10, 100 или даже 1000 для некоторых симуляций).
Время от времени мне приходится вычислять пригодность частиц. В настоящее время я делаю это в цикле for:

for p in listOfParticles:
  p.getFitness(args)

Однако я заметил, что пригодность каждой частицы можно вычислить независимо друг от друга. Это делает вычисление пригодности главным кандидатом на распараллеливание. Действительно, я мог бы сделать map(lambda p: p.getFitness(args), listOfParticles).

Теперь я могу легко сделать это с помощью futures.ProcessPoolExecutor:

with futures.ProcessPoolExecutor() as e:
  e.map(lambda p: p.getFitness(args), listOfParticles)

Поскольку побочные эффекты вызова p.getFitness хранятся в каждой частице, мне не нужно беспокоиться о возврате от futures.ProcessPoolExecutor().

Все идет нормально. Но теперь я замечаю, что ProcessPoolExecutor создает новые процессы, а значит копирует память, что медленно. Я хотел бы иметь возможность делиться памятью, поэтому я должен использовать потоки. Это хорошо, пока я не понял, что запуск нескольких процессов с несколькими потоками внутри каждого процесса, вероятно, будет быстрее, поскольку несколько потоков по-прежнему работают только на одном процессоре моей милой 8-ядерной машины.

Вот где я столкнулся с проблемой:
Судя по примерам, которые я видел, ThreadPoolExecutor работает с list. Как и ProcessPoolExecutor. Таким образом, я не могу сделать ничего итеративного в ProcessPoolExecutor, чтобы отдать на откуп ThreadPoolExecutor, потому что тогда ThreadPoolExecutor получит один объект для работы (см. мою попытку, опубликованную ниже).
С другой стороны, я не могу нарезать listOfParticles самостоятельно. , потому что я хочу, чтобы ThreadPoolExecutor творил свою собственную магию, чтобы выяснить, сколько потоков требуется.

Итак, большой вопрос (наконец-то):
как мне структурировать мой код, чтобы я мог эффективно распараллелить следующие процессы, используя как процессы, так и потоки:

for p in listOfParticles:
  p.getFitness()

Это то, что я пытался, но я бы не осмелился запустить его, потому что я знаю, что это не сработает:

>>> def threadize(func, L, mw):
...     with futures.ThreadpoolExecutor(max_workers=mw) as executor:
...             for i in L:
...                     executor.submit(func, i)
... 

>>> def processize(func, L, mw):
...     with futures.ProcessPoolExecutor() as executor:
...             executor.map(lambda i: threadize(func, i, mw), L)
...

Я был бы признателен за любые мысли о том, как это исправить или даже о том, как улучшить мой подход.

Если это имеет значение, я на python3.3.2


person inspectorG4dget    schedule 15.11.2013    source источник
comment
Какой код выполняет getFitness()? Проблема с потоками в CPython заключается в том, что они подходят только для задач, связанных с вводом-выводом, потому что CPython имеет глобальную блокировку интерпретатора (GIL), которая разрешает выполняться только одному потоку за раз. Если, например, getFitness() запускает код Python, привязанный к процессору, GIL заставит многопоточность работать медленнее, чем без многопоточности (тогда многопоточность просто добавляет дополнительные накладные расходы для переключения контекста). Но если, например, getFitness() запускает функцию модуля расширения, которая освобождает GIL, то может помочь многопоточность (например, многие функции numpy освобождают GIL).   -  person Tim Peters    schedule 24.11.2013
comment
getFitness интерпретирует информацию, закодированную в частице, как начальную семантику нейронной сети, запускает полученную нейронную сеть и вычисляет выходную ошибку (эта выходная ошибка является приспособленностью — ну, на самом деле, ее обратной величиной). Таким образом, я считаю, что эта функция будет больше привязана к процессору, чем к вводу-выводу (я сделал все, что связано с нейронной сетью, с нуля, и это все списки классов и их умножения). Так что, возможно, потоки не слишком помогут в этой конкретной ситуации, но я все же хотел бы иметь возможность использовать ThreadPool в ProcessPool для применимых проблем.   -  person inspectorG4dget    schedule 25.11.2013


Ответы (3)


Я дам вам рабочий код, который смешивает процессы с потоками для решения проблемы, но это не то, что вы ожидаете ;-) Первое, что нужно сделать, это создать фиктивную программу, которая не подвергает опасности ваши реальные данные. Поэкспериментируйте с чем-нибудь безобидным. Итак, вот начало:

class Particle:
    def __init__(self, i):
        self.i = i
        self.fitness = None
    def getfitness(self):
        self.fitness = 2 * self.i

Теперь нам есть во что поиграть. Далее некоторые константы:

MAX_PROCESSES = 3
MAX_THREADS = 2 # per process
CHUNKSIZE = 100

Поиграйте с ними по вкусу. CHUNKSIZE будет объяснено позже.

Первый сюрприз для вас — это то, что делает моя рабочая функция самого низкого уровня. Это потому, что вы слишком оптимистичны:

Поскольку побочные эффекты вызова p.getFitness сохраняются в каждой частице, мне не нужно беспокоиться о возврате от futures.ProcessPoolExecutor().

Увы, ничего, сделанное в рабочем процессе, не может повлиять на Particle экземпляров в вашей основной программе. Рабочий процесс работает с копиями Particle экземпляров, либо с помощью реализации копирования при записи fork(), либо потому, что он работает с копией, созданной в результате распаковки Particle pickle, переданного между процессами.

Таким образом, если вы хотите, чтобы ваша основная программа видела результаты фитнеса, вам необходимо организовать отправку информации обратно в основную программу. Поскольку я недостаточно знаю вашу реальную программу, здесь я предполагаю, что Particle().i является уникальным целым числом и что основная программа может легко отображать целые числа обратно в экземпляры Particle. Имея это в виду, рабочая функция самого низкого уровня здесь должна возвращать пару: уникальное целое число и результат пригодности:

def thread_worker(p):
    p.getfitness()
    return (p.i, p.fitness)

Учитывая это, легко распределить список Particle по потокам и вернуть список (particle_id, fitness) результатов:

def proc_worker(ps):
    import concurrent.futures as cf
    with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
        result = list(e.map(thread_worker, ps))
    return result

Заметки:

  1. Это функция, которую будет выполнять каждый рабочий процесс.
  2. Я использую Python 3, поэтому используйте list(), чтобы заставить e.map() материализовать все результаты в списке.
  3. Как упоминалось в комментарии, в CPython распределение задач, связанных с ЦП, по потокам медленнее, чем выполнение их всех в одном потоке.

Остается только написать код, чтобы распределить список Particle по процессам и получить результаты. Это чертовски легко сделать с multiprocessing, поэтому я собираюсь использовать его. Я понятия не имею, может ли это сделать concurrent.futures (учитывая, что мы также смешиваем потоки), но мне все равно. Но поскольку я даю вам рабочий код, вы можете поиграть с ним и сообщить ;-)

if __name__ == "__main__":
    import multiprocessing

    particles = [Particle(i) for i in range(100000)]
    # Note the code below relies on that particles[i].i == i
    assert all(particles[i].i == i for i in range(len(particles)))

    pool = multiprocessing.Pool(MAX_PROCESSES)
    for result_list in pool.imap_unordered(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
        for i, fitness in result_list:
            particles[i].fitness = fitness

    pool.close()
    pool.join()

    assert all(p.fitness == 2*p.i for p in particles)

Заметки:

  1. Я разбиваю список Particles на куски "вручную". Вот для чего CHUNKSIZE. Это потому, что рабочему процессу нужен список из Particle для работы, и, в свою очередь, это то, что нужно функции futures map(). Хорошей идеей является разделение работы независимо от того, что вы получаете, чтобы получить реальную отдачу в обмен на межпроцессные накладные расходы для каждого вызова.
  2. imap_unordered() не дает никаких гарантий относительно порядка, в котором возвращаются результаты. Это дает реализации больше свободы для организации работы максимально эффективно. И нас не волнует порядок здесь, так что все в порядке.
  3. Обратите внимание, что цикл получает результаты (particle_id, fitness) и соответствующим образом изменяет экземпляры Particle. Возможно, ваш настоящий .getfitness вносит другие мутации в экземпляры Particle - не могу предположить. Несмотря на это, основная программа никогда не увидит никаких мутаций, сделанных в воркерах "по волшебству" - вы должны явно организовать это. Вместо этого вы можете вернуть (particle_id, particle_instance) пар и заменить Particle экземпляров в основной программе. Тогда они будут отражать все мутации, сделанные в рабочих процессах.

Веселиться :-)

Фьючерсы полностью вниз

Оказывается, заменить multiprocessing было очень просто. Вот изменения. Это также (как упоминалось ранее) заменяет исходные экземпляры Particle, чтобы зафиксировать все мутации. Однако здесь есть компромисс: для травления экземпляра требуется «намного больше» байтов, чем для травления одного результата «пригодности». Больше сетевого трафика. Выбрать свой яд ;-)

Для возврата измененного экземпляра требуется просто заменить последнюю строку thread_worker(), например:

return (p.i, p)

Затем замените весь блок «main» следующим:

def update_fitness():
    import concurrent.futures as cf
    with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
        for result_list in e.map(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
            for i, p in result_list:
                particles[i] = p

if __name__ == "__main__":
    particles = [Particle(i) for i in range(500000)]
    assert all(particles[i].i == i for i in range(len(particles)))

    update_fitness()

    assert all(particles[i].i == i for i in range(len(particles)))
    assert all(p.fitness == 2*p.i for p in particles)

Код очень похож на танец multiprocessor. Лично я бы использовал версию multiprocessing, потому что imap_unordered ценна. Это проблема упрощенных интерфейсов: они часто покупают простоту за счет сокрытия полезных возможностей.

person Tim Peters    schedule 25.11.2013
comment
Не за что :-) Только что просмотрите редактирование: multiprocessing на самом деле не нужно в конце концов. - person Tim Peters; 25.11.2013
comment
Когда использовать ProcessPoolExecutor() вместо ThreadPoolExecutor() или наоборот для параллелизма? Как описано здесь, вы может даже объединить его с помощью объекта «Будущее» :S? - person danger89; 04.04.2014
comment
@TimPeters Есть ли причина, по которой вы импортируете concurrent.futures внутри функции? - person Al Guy; 11.08.2020
comment
@TimPeters Могу я попросить вас взглянуть на мой вопрос? stackoverflow.com/questions/63306875 / - person Al Guy; 11.08.2020

Во-первых, вы уверены, что сможете использовать многопоточность при загрузке всех ядер процессами? Если он привязан к процессору, вряд ли да. По крайней мере, некоторые тесты должны быть сделаны.

Если добавление потоков увеличивает вашу производительность, следующий вопрос заключается в том, можно ли добиться большей производительности с помощью ручной балансировки нагрузки или автоматической. Под «сделанным вручную» я подразумеваю тщательное разделение рабочей нагрузки на части одинаковой вычислительной сложности и создание нового процессора задач для каждой части — ваше оригинальное, но сомнительное решение. По автоматическому созданию пула процессов/потоков и связи в рабочей очереди для новых задач, к которым вы стремитесь. На мой взгляд, первый подход является одним из парадигм Apache Hadoop, второй реализуется обработчиками очереди работ, такими как Celery. Первый подход может страдать от того, что некоторые фрагменты задач работают медленнее и выполняются, в то время как другие завершаются, второй добавляет накладные расходы на коммутацию и ожидание задачи, и это вторая точка тестов производительности, которые необходимо выполнить.

Наконец, если вы хотите иметь статическую коллекцию процессов с многопоточностью внутри, AFAIK, вы не можете добиться этого с помощью concurrent.futures как есть, и вам придется немного изменить его. Я не знаю, существуют ли решения для этой задачи, но поскольку concurrent — это решение на чистом питоне (без кода на языке C), это можно легко сделать. Рабочий процессор определяется в _adjust_process_count рутине класса ProcessPoolExecutor, а его подклассы и переопределение с помощью многопоточного подхода довольно просты, вам просто нужно указать свой собственный _process_worker на основе concurrent.features.thread

Оригинал ProcessPoolExecutor._adjust_process_count для справки:

def _adjust_process_count(self):
    for _ in range(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                      self._result_queue))
        p.start()
        self._processes[p.pid] = p
person alko    schedule 15.11.2013
comment
Я бы предпочел использовать автоматическую балансировку нагрузки. Это потому, что распределение, хотя и полезное для моей симуляции, не имеет первостепенного значения. Итак, что я пытаюсь сделать, так это добиться большей эффективности с минимальными усилиями по программированию. Но, к вашему первому замечанию, почему я вряд ли улучшу производительность задачи, связанной с процессором, с несколькими процессами и несколькими потоками на процесс? - person inspectorG4dget; 15.11.2013
comment
@ spectorG4dget трудно сказать без фактической проверки кода getFitness, архитектуры ЦП и используемых команд, и это зависит от многих факторов, но основной причиной будет переключение контекста ЦП, промахи кеша ЦП и т. д. Удалось ли вам написать многопроцессорную / многопоточную настройку Excecutor, переопределяющую работать или нужна дополнительная помощь? - person alko; 16.11.2013
comment
Я не удосужился написать переопределяющую функцию настройки. Боюсь, на данный момент это немного не в моих силах. НО, что более важно, я не ищу абсолютно лучшее решение. Я хочу получить некоторое ускорение с минимальными усилиями, поэтому я не возражаю против неоптимального решения, если оно все же лучше, чем одиночный однопоточный процесс. - person inspectorG4dget; 17.11.2013

Это обобщенный ответ, в котором используется пакет threadedprocess, который реализует ThreadedProcesPoolExecutor, позволяя комбинированное использование потока пул внутри пула процессов. Ниже приведена служебная функция общего назначения, которая ее использует:

import concurrent.futures
import logging
from typing import Callable, Iterable, Optional

import threadedprocess

log = logging.getLogger(__name__)


def concurrently_execute(fn: Callable, fn_args: Iterable, max_processes: Optional[int] = None, max_threads_per_process: Optional[int] = None) -> None:
    """Execute the given callable concurrently using multiple threads and/or processes."""
    # Ref: https://stackoverflow.com/a/57999709/
    if max_processes == 1:
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_threads_per_process)
    elif max_threads_per_process == 1:
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=max_processes)  # type: ignore
    else:
        executor = threadedprocess.ThreadedProcessPoolExecutor(max_processes=max_processes, max_threads=max_threads_per_process)

    if max_processes and max_threads_per_process:
        max_workers = max_processes * max_threads_per_process
        log.info("Using %s with %s processes and %s threads per process, i.e. with %s workers.", executor.__class__.__name__, max_processes, max_threads_per_process, max_workers)

    with executor:
        futures = [executor.submit(fn, *fn_args_cur) for fn_args_cur in fn_args]

    for future in concurrent.futures.as_completed(futures):
        future.result()  # Raises exception if it occurred in process worker.
person Acumenus    schedule 18.09.2019