Использование Concurrent.Futures.ProcessPoolExecutor для запуска одновременных и независимых моделей ABAQUS

Я хочу запустить в общей сложности nAnalysis=25 моделей Abaqus, каждая из которых использует X ядер, и я могу одновременно запускать nParallelLoops=5 этих моделей. Если один из текущих 5 анализов завершится, то должен начаться другой анализ, пока не будут завершены все nAnalysis.

Я реализовал приведенный ниже код на основе решений, опубликованных в разделах 1 и 2. Однако я что-то упускаю, потому что все nAnalysis пытаются начать с "однажды", код блокируется, и анализ никогда не завершается, поскольку многие из них могут захотеть использовать одни и те же ядра. чем использует уже начатый анализ.

  1. Использование многопроцессорного модуля Python для одновременного и раздельного выполнения SEAWAT/MODFLOW запуск модели
  2. Как распараллелить этот вложенный цикл в Python, который вызывает Абак
def runABQfile(*args):    
    import subprocess
    import os

    inpFile,path,jobVars = args

    prcStr1 = (path+'/runJob.sh')

    process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)

def safeABQrun(*args):
    import os

    try:
        runABQfile(*args)
    except Exception as e:
        print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    import os
    from concurrent.futures import ProcessPoolExecutor
    from concurrent.futures import as_completed
    from concurrent.futures import wait

    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis))  # 5Nodes
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

Единственный способ до сих пор, который я могу запустить, - это изменить errFunction, чтобы использовать ровно 5 анализов одновременно, как показано ниже. Однако этот подход иногда приводит к тому, что один из анализов занимает гораздо больше времени, чем остальные 4 в каждой группе (каждый ProcessPoolExecutor вызов), и поэтому следующая группа из 5 не запускается, несмотря на доступность ресурсов (ядер). В конечном итоге это приводит к увеличению времени для завершения всех 25 моделей.

def errFunction(ppos, *args):
    import os
    from concurrent.futures import ProcessPoolExecutor
    from concurrent.futures import as_completed
    from concurrent.futures import wait    

    # Group 1
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 2
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 3
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 4
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

    # Group 5
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25))  # 5Nodes        
        wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')

Я пытался использовать функцию as_completed, но она тоже не работает.

Пожалуйста, не могли бы вы помочь определить правильное распараллеливание, чтобы я мог запустить nAnalysis, всегда выполняя одновременно nParallelLoops? Ваша помощь приветствуется. Я использую Python 2.7

Бестс, Дэвид П.


ОБНОВЛЕНИЕ 30 ИЮЛЯ 2016 Г.:

Я ввел цикл в safeABQrun, и он управлял 5 различными «очередями». Цикл необходим, чтобы избежать случая, когда анализ пытается запуститься в узле, в то время как другой все еще работает. Анализ предварительно настроен для запуска на одном из запрошенных узлов перед началом любого фактического анализа.

def safeABQrun(*list_args):
    import os

    inpFiles,paths,jobVars = list_args

    nA = len(inpFiles)
    for k in range(0,nA): 
        args = (inpFiles[k],paths[k],jobVars[k])
        try:
            runABQfile(*args) # Actual Run Function
        except Exception as e:
            print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args)  # 5Nodes

        for f in as_completed(futures):
            print("|=== Finish Process Train %d ===|" % futures[f])
            if f.exception() is not None:
               print('%r generated an exception: %s' % (futures[f], f.exception()))

person David P.    schedule 29.07.2016    source источник


Ответы (2)


Мне кажется, все в порядке, но я не могу запустить ваш код как есть. Как насчет того, чтобы попробовать что-то гораздо более простое, а затем добавлять до тех пор, пока не появится "проблема"? Например, показывает ли следующее поведение, которое вы хотите? Это работает на моей машине, но я использую Python 3.5.2. Вы говорите, что используете 2.7, но concurrent.futures не существовало в Python 2, поэтому, если вы используете 2.7, у вас должен быть запущен чей-то бэкпорт библиотеки, и, возможно, проблема в этом. Попробуйте следующее, чтобы ответить, так ли это:

from concurrent.futures import ProcessPoolExecutor, wait, as_completed

def worker(i):
    from time import sleep
    from random import randrange
    s = randrange(1, 10)
    print("%d started and sleeping for %d" % (i, s))
    sleep(s)

if __name__ == "__main__":
    nAnalysis = 25
    nParallelLoops = 5
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(worker, k), k) for k in range(nAnalysis))
        for f in as_completed(futures):
            print("got %d" % futures[f])

Типичный вывод:

0 started and sleeping for 4
1 started and sleeping for 1
2 started and sleeping for 1
3 started and sleeping for 6
4 started and sleeping for 5
5 started and sleeping for 9
got 1
6 started and sleeping for 5
got 2
7 started and sleeping for 6
got 0
8 started and sleeping for 6
got 4
9 started and sleeping for 8
got 6
10 started and sleeping for 9
got 3
11 started and sleeping for 6
got 7
12 started and sleeping for 9
got 5
...
person Tim Peters    schedule 29.07.2016

Я ввел цикл в safeABQrun, и он управлял 5 различными «очередями». Цикл необходим, чтобы избежать случая, когда анализ пытается запуститься в узле, в то время как другой все еще работает. Анализ предварительно настроен для запуска в одном из запрошенных узлов перед началом любого фактического анализа.

def safeABQrun(*list_args):
    import os

    inpFiles,paths,jobVars = list_args

    nA = len(inpFiles)
    for k in range(0,nA): 
        args = (inpFiles[k],paths[k],jobVars[k])
        try:
            runABQfile(*args) # Actual Run Function
        except Exception as e:
            print("Tread Error: %s runABQfile(*%r)" % (e, args))

def errFunction(ppos, *args):
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
        futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args)  # 5Nodes

        for f in as_completed(futures):
            print("|=== Finish Process Train %d ===|" % futures[f])
            if f.exception() is not None:
               print('%r generated an exception: %s' % (futures[f], f.exception()))
person David P.    schedule 30.07.2016