Предоставляют ли многопроцессорные пулы каждому процессу одинаковое количество задач или они назначаются как доступные?

Когда вы map итерируете multiprocessing.Pool, итерации делятся на очередь для каждого процесса в пуле в начале, или есть общая очередь, из которой берется задача, когда процесс освобождается?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

Итак, учитывая этот непроверенный код предложения; если в пуле 4 процесса, каждому процессу выделяется 25 элементов для выполнения, или 100 элементов выбираются один за другим процессами, которые ищут элементы для выполнения, чтобы каждый процесс мог выполнять разное количество элементов, например 30 , 26, 24, 20.


person John Mee    schedule 07.11.2012    source источник
comment
Это не имеет отношения к вашему вопросу, но если ваша итерация является генератором или другим ленивым типом, вы, вероятно, захотите использовать imap вместо map и передать явный параметр chunksize.   -  person abarnert    schedule 07.11.2012
comment
о, это актуально и применимо, учитывая, что я не уверен, что chunksize по умолчанию для map - отсутствие указанного значения по умолчанию подтверждает мои подозрения в комментариях ниже - он распределяет всю партию поровну для каждого процесса в начале.   -  person John Mee    schedule 07.11.2012
comment
Как я уже упоминал в своем ответе, вы можете прочитать источник. map занимает chunksize=None. Затем в map_async (который он использует) if chunksize is None он устанавливает chunksize, extra = divmod(len(iterable), len(self.pool) * 4) (а затем if extra, chunksize += 1). Итак, если у вас есть пул из 8 рабочих и 100 рабочих мест, chunksize будет 4.   -  person abarnert    schedule 07.11.2012
comment
классно; также объясняет, почему map проходит через всю итерацию в начале - он находит len. Я вижу, собираюсь ли я yield, тогда мне в любом случае следует использовать imap. Спасибо всем!   -  person John Mee    schedule 07.11.2012
comment
Как я сказал ниже, это компромисс. map проходит через всю итерацию, что означает задержку перед запуском и / или запуск в памяти (ничего страшного для 100 int, но, скажем, для 1000 результатов веб-пауков это, вероятно, неприемлемо, а тем более, скажем, itertools.repeat…). Но это немного проще, и вы получаете значение по умолчанию chunksize вместо того, чтобы вычислять / измерять / угадывать.   -  person abarnert    schedule 07.11.2012
comment
Также объясняется, почему после более чем 24 часов работы моя длинная очередь из 10 000+ с 8 процессами постоянно сокращается: каждый процесс медленно умирает один за другим - размер chunksize превышает 300 каждый. Поскольку каждая задача занимает 30-60 секунд каждая, неудивительно, что прошло 3 часа с момента завершения первого процесса; наконец-то осталось закончить только один процесс. zzzzz. живи и учись.   -  person John Mee    schedule 07.11.2012


Ответы (3)


Итак, учитывая этот непроверенный код предложения; если в пуле 4 процесса, каждому процессу выделяется 25 элементов для выполнения, или 100 элементов выбираются один за другим процессами, которые ищут элементы для выполнения, чтобы каждый процесс мог выполнять разное количество элементов, например 30 , 26, 24, 20.

Что ж, очевидный ответ - проверить это.

Как есть, тест может мало рассказать вам, потому что задания будут завершены как можно скорее, и возможно, что в конечном итоге все будет равномерно распределено, даже если объединенные процессы захватят задания по мере их готовности. Но есть простой способ исправить это:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

Если числа «неровные», вы знаете, что объединенные процессы должны получать новые задания как готовые. (Я явно установил chunksize в 1, чтобы убедиться, что фрагменты не настолько велики, чтобы каждый получил только один фрагмент.)

Когда я запускаю его на 8-ядерной машине:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

Итак, похоже, что процессы получают новые рабочие места на лету.

Поскольку вы конкретно спросили о 4 рабочих, я изменил Pool() на Pool(4) и получил следующее:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

Однако есть еще лучший способ узнать, чем тестирование: прочтите источник.

Как видите, map просто вызывает map_async, который создает группу пакетов и помещает их в объект self._taskqueue (экземпляр Queue.Queue). Если вы читаете дальше, эта очередь не используется совместно с другими процессами напрямую, но есть поток диспетчера пула, который всякий раз, когда процесс завершается и возвращает результат, выталкивает следующее задание из очереди и отправляет его обратно процессу.

Таким же образом можно узнать размер блока по умолчанию для map. Реализация 2.7, связанная выше, показывает, что она просто округлена len(iterable) / (len(self._pool) * 4) (немного более подробна, чем это, чтобы избежать дробной арифметики) - или, другими словами, достаточно большой для примерно 4 фрагментов на процесс. Но на это не стоит полагаться; документация нечетко и косвенно подразумевает, что будет использоваться какая-то эвристика, но не дает никаких гарантий относительно того, что это будет. Итак, если вам действительно нужно «около 4 частей на процесс», рассчитайте это явно. Более реалистично, если вам когда-либо понадобится что-то, кроме значения по умолчанию, вам, вероятно, понадобится значение, зависящее от предметной области, которое вы собираетесь обработать (путем расчета, предположения или профилирования).

person abarnert    schedule 07.11.2012
comment
спасибо приятель. Что касается теста, я не знал, как считать. Я думал, что мне нужно придумать, как поделиться переменной или что-то в этом роде. Этот процесс подсчета проницателен. Вам нужен pool.join() после закрытия, чтобы убедиться, что все сделано, прежде чем выплевывать счета? - person John Mee; 07.11.2012
comment
Помните, что map возвращает значение для каждого задания и объединяет их в список (а map_async, imap и imap_unordered дают вам одну и ту же информацию по-разному), поэтому вам редко нужно делать какое-либо межпроцессное совместное использование только для того, чтобы получить такую ​​информацию. - person abarnert; 07.11.2012
comment
Что касается join, в этом случае он вам не нужен: map блокируется до тех пор, пока не будут возвращены все 100 его результатов, и нет других заданий отправки кода. Но да, если вы хотите поэкспериментировать с другими методами распределения рабочих мест, она вам может понадобиться. - person abarnert; 07.11.2012

http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map

map(func, iterable[, chunksize])

Этот метод разбивает итерируемый объект на несколько частей, которые он отправляет в пул процессов как отдельные задачи. (Приблизительный) размер этих фрагментов можно указать, задав для chunksize положительное целое число.

Я предполагаю, что процесс забирает следующий фрагмент из очереди, когда завершает работу с предыдущим фрагментом.

Значение по умолчанию chunksize зависит от длины iterable и выбирается таким образом, чтобы количество фрагментов было примерно в четыре раза больше количества процессов. (источник)

person Janne Karila    schedule 07.11.2012
comment
Замечу, что размер блока по умолчанию для imap указан как 1. Интересно, что по умолчанию для map? Что касается того, что делает мое приложение сейчас, я подозреваю, что вначале оно делит карту на равные части; но не уверен в этом - отсюда и вопрос. - person John Mee; 07.11.2012
comment
@JohnMee: Причина, по которой значение по умолчанию для imap равно 1, состоит в том, что imap не знает длины iterable, поэтому он не может эвристически угадать лучший chunksize. (И да, это означает, что есть компромисс - иногда на самом деле быстрее построить list из iterable, просто чтобы получить эту эвристику. Но обычно вы можете придумать лучший chunksize в любом случае, просто зная пространство проблемы .) - person abarnert; 07.11.2012

Чтобы оценить chunksize, используемый реализацией Python, не глядя на multiprocessing исходный код модуля, запустите:

#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby

def work(index):
    mp.get_logger().info(index)
    return index, mp.current_process().name

if __name__ == "__main__":
    import logging
    import sys
    logger = mp.log_to_stderr()

    # process cmdline args
    try:
        sys.argv.remove('--verbose')
    except ValueError:
        pass  # not verbose
    else:
        logger.setLevel(logging.INFO)  # verbose
    nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
    # choices: 'map', 'imap', 'imap_unordered'
    map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
    kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}

    # estimate chunksize used
    max_chunksize = 0
    map_func = getattr(mp.Pool(nprocesses), map_name)
    for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
                                   key=lambda x: x[0]),  # sort by index
                            key=lambda x: x[1]):  # group by process name
        max_chunksize = max(max_chunksize, len(list(group)))
    print("%s: max_chunksize %d" % (map_name, max_chunksize))

Он показывает, что imap, imap_unordered используют chunksize=1 по умолчанию, а max_chunksize для map зависит от nprocesses, nitem (количество фрагментов на процесс не фиксировано) и max_chunksize зависит от версии Python. Все *map* функции учитывают параметр chunksize, если он указан.

использование

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

Чтобы увидеть, как распределяются отдельные рабочие места; укажите параметр --verbose.

person jfs    schedule 07.11.2012