Модуль многопроцессорности Python: объединение процессов с тайм-аутом

Я занимаюсь оптимизацией параметров сложной симуляции. Я использую многопроцессорный модуль для повышения производительности алгоритма оптимизации. Основы многопроцессорной обработки я изучил на http://pymotw.com/2/multiprocessing/basics.html< /а>. Сложная симуляция длится разное время в зависимости от заданных параметров алгоритма оптимизации, от 1 до 5 минут. Если параметры выбраны очень плохо, симуляция может длиться 30 минут и более, а результаты бесполезны. Поэтому я подумал о том, чтобы установить тайм-аут для многопроцессорной обработки, который завершает все симуляции, которые длятся дольше определенного времени. Вот абстрактная версия проблемы:

import numpy as np
import time
import multiprocessing

def worker(num):

    time.sleep(np.random.random()*20)

def main():

    pnum = 10    

    procs = []
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print 'starting', p.name

    for p in procs:
        p.join(5)
        print 'stopping', p.name

if __name__ == "__main__":
    main()

Строка p.join(5) определяет тайм-аут 5 секунд. Из-за цикла for for p in procs: программа ждет 5 секунд, пока не завершится первый процесс, а затем еще 5 секунд, пока не завершится второй процесс, и так далее, но я хочу, чтобы программа завершала все процессы, которые длятся более 5 секунд. Кроме того, если ни один из процессов не длится дольше 5 секунд, программа не должна ждать эти 5 секунд.


person brp    schedule 26.09.2014    source источник
comment
Взгляните сюда: stackoverflow.com/q/1191374/2615940. Это может быть дубликат, но я недостаточно уверен, чтобы отметить его для вас. Если предложенное решение этого ответа не работает для вас, сообщите нам, почему.   -  person skrrgwasme    schedule 26.09.2014
comment
Это интересная статья, но, как мне кажется, это решение для последовательно и не одновременно запущенных процессов. Моя программа должна запускать процессы одновременно и убивать те, которые превышают «глобальный» тайм-аут.   -  person brp    schedule 27.09.2014


Ответы (3)


Вы можете сделать это, создав цикл, который будет ждать некоторое время ожидания в секундах, часто проверяя, завершены ли все процессы. Если они не все завершатся за отведенное время, завершите все процессы:

TIMEOUT = 5 
start = time.time()
while time.time() - start <= TIMEOUT:
    if not any(p.is_alive() for p in procs):
        # All the processes are done, break now.
        break

    time.sleep(.1)  # Just to avoid hogging the CPU
else:
    # We only enter this if we didn't 'break' above.
    print("timed out, killing all processes")
    for p in procs:
        p.terminate()
        p.join()
person dano    schedule 26.09.2014
comment
Спасибо, это похоже на подходящее решение. К сожалению, этот код не сломается, если процессы завершатся до истечения времени ожидания. Я попробовал это, установив для рабочей функции значение time.sleep(1), и через 1 секунду все p.is_alive() возвращают False. Таким образом, код должен перейти к оператору break сейчас, но он все еще ждет тайм-аута... - person brp; 27.09.2014
comment
Я нашел проблему: print (p.is_alive() for p in procs) возвращает <generator object <genexpr> at 0x05712B20>, но это должен быть список с элементами True или False, чтобы быть понятным для any() - person brp; 27.09.2014
comment
@brp используйте any([p.is_alive() for p in procs]). Таким образом, он становится пониманием списка вместо выражения генератора. - person dano; 27.09.2014
comment
@brp о, я только что заметил, что ты используешь np.any вместо встроенного any. Вот почему выражение генератора не сработало. np.any работает только с массивоподобными объектами. - person dano; 27.09.2014
comment
встроенный any с пониманием списка - это ключ! Благодарю вас! - person brp; 27.09.2014

Если вы хотите убить все процессы, которые вы могли бы использовать в пуле из многопроцессорной обработки, вам необходимо определить общий тайм-аут для всего выполнения, а не отдельные тайм-ауты.

import numpy as np
import time
from multiprocessing import Pool

def worker(num):
    xtime = np.random.random()*20
    time.sleep(xtime)
    return xtime

def main():

    pnum = 10
    pool = Pool()
    args = range(pnum)
    pool_result = pool.map_async(worker, args)

    # wait 5 minutes for every worker to finish
    pool_result.wait(timeout=300)

    # once the timeout has finished we can try to get the results
    if pool_result.ready():
        print pool_result.get(timeout=1)

if __name__ == "__main__":
    main()

Это даст вам список с возвращаемыми значениями для всех ваших воркеров по порядку.
Дополнительная информация здесь: https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool

person Raiden Drake    schedule 31.03.2015
comment
Я не думаю, что это на самом деле завершает потоки в пуле — он просто возвращает выполнение основному потоку, даже если они еще не завершены. - person Ben Wheeler; 29.01.2021
comment
Я не понимаю, почему мы делаем pool_result.get(timeout=1), т.е.: если pool_result уже готов, не должны ли быть готовы и результаты, и тайм-аут не нужен? - person Kieleth; 16.06.2021

Благодаря помощи dano я нашел решение:

import numpy as np
import time
import multiprocessing

def worker(num):

    time.sleep(np.random.random()*20)

def main():

    pnum = 10    
    TIMEOUT = 5 
    procs = []
    bool_list = [True]*pnum

    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print 'starting', p.name

    start = time.time()
    while time.time() - start <= TIMEOUT:
        for i in range(pnum):
            bool_list[i] = procs[i].is_alive()

        print bool_list

        if np.any(bool_list):  
            time.sleep(.1)  
        else:
            break
    else:
        print("timed out, killing all processes")
        for p in procs:
            p.terminate()

    for p in procs:
        print 'stopping', p.name,'=', p.is_alive()
        p.join()

if __name__ == "__main__":
    main()

Это не самый элегантный способ, я уверен, что есть лучший способ, чем использование bool_list. Процессы, которые все еще живы после тайм-аута в 5 секунд, будут уничтожены. Если вы устанавливаете более короткое время в рабочей функции, чем тайм-аут, вы увидите, что программа останавливается до того, как истечет тайм-аут в 5 секунд. Я все еще открыт для более элегантных решений, если они есть :)

person brp    schedule 27.09.2014