функция оценивается только тогда, когда она вызывается из приложения, но не когда она вызывается из apply_async

Я прочитал process-or-pool-for- что я делаю и когда использовать-применить-применить-асинхронную-или-карту, и я надеялся, что понимаю разницу между apply и apply_async. Но у меня есть этот код, который возвращает желаемый результат только при использовании apply и очень короткий, когда используется apply_async:

#!/bin/env python
import multiprocessing
import time
import os

semaphore = multiprocessing.Semaphore(1)
# semaphore = multiprocessing.Manager().Semaphore(1)

def producer(num, len, output):
    time.sleep(1)
    element = "PROCESS: %d PID: %d, PPID: %d, QSIZE: %d" % (num, os.getpid(), os.getppid(), output.qsize())
    semaphore.acquire()
    print "PID: %s WRITE -> %s" % (os.getpid(), element)
    if (num == len - 1):
        print "PID: %d WRITE -> Everything was written inside queue, no more apply_async calling, just reading!" % os.getpid()
    output.put(element)
    semaphore.release()
    time.sleep(1)

def consumer(output):
    while True:
      try:
        print "PID: %d READ  <- %s" % (os.getpid(), output.get())
        break
      except:
        print "PID: %d READ  <- NOTHING IN BUFFER" % os.getpid()
        # pass
      time.sleep(1)

if __name__ == '__main__':
    """
    MULTIPLE PRODUCERS AND MULTIPLE CONSUMERS
    """
    output    = multiprocessing.Manager().Queue()
    pool      = multiprocessing.Pool(4)
    lst       = range(40)

    print "Calling apply*!"
    for i in lst:
        pool.apply_async(producer, (i, len(lst), output))
    print "Do not wait until apply* finishes!"

    for i in lst:
        # RETURNS OUTPUT
        # pool.apply(consumer, (output,))

        # DOES NOT RETURN OUTPUT
        pool.apply_async(consumer, (output,))

Вывод при использовании pool.apply:

Calling apply*!
Do not wait until apply* finishes!
PID: 18348 WRITE -> PROCESS: 1 PID: 18348, PPID: 18341, QSIZE: 0
PID: 18346 WRITE -> PROCESS: 0 PID: 18346, PPID: 18341, QSIZE: 1
PID: 18349 WRITE -> PROCESS: 2 PID: 18349, PPID: 18341, QSIZE: 2
PID: 18347 WRITE -> PROCESS: 3 PID: 18347, PPID: 18341, QSIZE: 3
PID: 18346 WRITE -> PROCESS: 4 PID: 18346, PPID: 18341, QSIZE: 4
PID: 18348 WRITE -> PROCESS: 5 PID: 18348, PPID: 18341, QSIZE: 5
PID: 18349 WRITE -> PROCESS: 6 PID: 18349, PPID: 18341, QSIZE: 6
PID: 18347 WRITE -> PROCESS: 7 PID: 18347, PPID: 18341, QSIZE: 7
...

Вывод при использовании pool.apply_async:

Calling apply*!
Do not wait until apply* finishes!

Похоже, что producer оценивается только при вызове из apply, но не при вызове из apply_async. Почему?


person Wakan Tanka    schedule 03.09.2015    source источник


Ответы (1)


Ваш код оценивается в любом случае, однако это делается в другом процессе. Разница в том, что apply блокирует, а apply_async нет. В своем коде вы отправляете работу другому процессу, а затем никогда не собираете ее обратно в основной процесс.

Обратите внимание, что apply возвращает значение, а apply_async возвращает объект результата. Вы должны вызвать get для объекта результата, чтобы получить результат. Вот дистиллированный пример:

>>> import multiprocessing
>>> import math
>>> 
>>> p = multiprocessing.Pool() 
>>> p.apply(math.sin, (.5,))
0.479425538604203
>>> result = p.apply_async(math.sin, (.5,))
>>> result 
<multiprocessing.pool.ApplyResult object at 0x103edc350>
>>> result.get()
0.479425538604203
>>> 

Если вы выполняете цикл for для apply или apply_async, вы можете подумать об использовании map или map_async.

>>> p.map(math.sin, range(5))
[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282]
>>> result = p.map_async(math.sin, range(5))
>>> result.get()
[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282]
person Mike McKerns    schedule 03.09.2015
comment
Спасибо за ваш ответ, я знаю о функциях map *, но основная проблема с их использованием заключается в том, что они не могут обрабатывать (простой способ) несколько аргументов. Можно поинтересоваться, что происходит с процессами, когда я не вызывал result.get(), они как-то очищаются или что? - person Wakan Tanka; 04.09.2015
comment
@WakanTanka: Нет, если вы не вызовете get, задания в других процессах завершатся и будут ждать в очереди. Они очищаются только при close или terminate других процессах. Если вам нужен map, который принимает несколько аргументов, проверьте мою вилку multiprocessing (называемую pathos.multiprocessing) - см.: stackoverflow.com/a/28001397 /2379433. Он также использует гораздо лучшую сериализацию, чем стандартная версия библиотеки. - person Mike McKerns; 04.09.2015