Итак, учитывая этот непроверенный код предложения; если в пуле 4 процесса, каждому процессу выделяется 25 элементов для выполнения, или 100 элементов выбираются один за другим процессами, которые ищут элементы для выполнения, чтобы каждый процесс мог выполнять разное количество элементов, например 30 , 26, 24, 20.
Что ж, очевидный ответ - проверить это.
Как есть, тест может мало рассказать вам, потому что задания будут завершены как можно скорее, и возможно, что в конечном итоге все будет равномерно распределено, даже если объединенные процессы захватят задания по мере их готовности. Но есть простой способ исправить это:
import collections
import multiprocessing
import os
import random
import time
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
#print moo
time.sleep(random.randint(0, 50) / 10.)
return os.getpid()
pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)
Если числа «неровные», вы знаете, что объединенные процессы должны получать новые задания как готовые. (Я явно установил chunksize
в 1, чтобы убедиться, что фрагменты не настолько велики, чтобы каждый получил только один фрагмент.)
Когда я запускаю его на 8-ядерной машине:
Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})
Итак, похоже, что процессы получают новые рабочие места на лету.
Поскольку вы конкретно спросили о 4 рабочих, я изменил Pool()
на Pool(4)
и получил следующее:
Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})
Однако есть еще лучший способ узнать, чем тестирование: прочтите источник.
Как видите, map
просто вызывает map_async
, который создает группу пакетов и помещает их в объект self._taskqueue
(экземпляр Queue.Queue
). Если вы читаете дальше, эта очередь не используется совместно с другими процессами напрямую, но есть поток диспетчера пула, который всякий раз, когда процесс завершается и возвращает результат, выталкивает следующее задание из очереди и отправляет его обратно процессу.
Таким же образом можно узнать размер блока по умолчанию для map
. Реализация 2.7, связанная выше, показывает, что она просто округлена len(iterable) / (len(self._pool) * 4)
(немного более подробна, чем это, чтобы избежать дробной арифметики) - или, другими словами, достаточно большой для примерно 4 фрагментов на процесс. Но на это не стоит полагаться; документация нечетко и косвенно подразумевает, что будет использоваться какая-то эвристика, но не дает никаких гарантий относительно того, что это будет. Итак, если вам действительно нужно «около 4 частей на процесс», рассчитайте это явно. Более реалистично, если вам когда-либо понадобится что-то, кроме значения по умолчанию, вам, вероятно, понадобится значение, зависящее от предметной области, которое вы собираетесь обработать (путем расчета, предположения или профилирования).
person
abarnert
schedule
07.11.2012
imap
вместоmap
и передать явный параметрchunksize
. - person abarnert   schedule 07.11.2012chunksize
по умолчанию дляmap
- отсутствие указанного значения по умолчанию подтверждает мои подозрения в комментариях ниже - он распределяет всю партию поровну для каждого процесса в начале. - person John Mee   schedule 07.11.2012map
занимаетchunksize=None
. Затем вmap_async
(который он использует)if chunksize is None
он устанавливаетchunksize, extra = divmod(len(iterable), len(self.pool) * 4)
(а затемif extra
,chunksize += 1
). Итак, если у вас есть пул из 8 рабочих и 100 рабочих мест,chunksize
будет 4. - person abarnert   schedule 07.11.2012map
проходит через всю итерацию в начале - он находитlen
. Я вижу, собираюсь ли яyield
, тогда мне в любом случае следует использоватьimap
. Спасибо всем! - person John Mee   schedule 07.11.2012map
проходит через всю итерацию, что означает задержку перед запуском и / или запуск в памяти (ничего страшного для 100 int, но, скажем, для 1000 результатов веб-пауков это, вероятно, неприемлемо, а тем более, скажем,itertools.repeat
…). Но это немного проще, и вы получаете значение по умолчаниюchunksize
вместо того, чтобы вычислять / измерять / угадывать. - person abarnert   schedule 07.11.2012