Я хочу запустить в общей сложности nAnalysis=25 моделей Abaqus, каждая из которых использует X ядер, и я могу одновременно запускать nParallelLoops=5 этих моделей. Если один из текущих 5 анализов завершится, то должен начаться другой анализ, пока не будут завершены все nAnalysis.
Я реализовал приведенный ниже код на основе решений, опубликованных в разделах 1 и 2. Однако я что-то упускаю, потому что все nAnalysis пытаются начать с "однажды", код блокируется, и анализ никогда не завершается, поскольку многие из них могут захотеть использовать одни и те же ядра. чем использует уже начатый анализ.
- Использование многопроцессорного модуля Python для одновременного и раздельного выполнения SEAWAT/MODFLOW запуск модели
- Как распараллелить этот вложенный цикл в Python, который вызывает Абак
def runABQfile(*args):
import subprocess
import os
inpFile,path,jobVars = args
prcStr1 = (path+'/runJob.sh')
process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)
def safeABQrun(*args):
import os
try:
runABQfile(*args)
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
Единственный способ до сих пор, который я могу запустить, - это изменить errFunction
, чтобы использовать ровно 5 анализов одновременно, как показано ниже. Однако этот подход иногда приводит к тому, что один из анализов занимает гораздо больше времени, чем остальные 4 в каждой группе (каждый ProcessPoolExecutor
вызов), и поэтому следующая группа из 5 не запускается, несмотря на доступность ресурсов (ядер). В конечном итоге это приводит к увеличению времени для завершения всех 25 моделей.
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
# Group 1
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 2
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 3
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 4
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 5
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
Я пытался использовать функцию as_completed
, но она тоже не работает.
Пожалуйста, не могли бы вы помочь определить правильное распараллеливание, чтобы я мог запустить nAnalysis, всегда выполняя одновременно nParallelLoops? Ваша помощь приветствуется. Я использую Python 2.7
Бестс, Дэвид П.
ОБНОВЛЕНИЕ 30 ИЮЛЯ 2016 Г.:
Я ввел цикл в safeABQrun
, и он управлял 5 различными «очередями». Цикл необходим, чтобы избежать случая, когда анализ пытается запуститься в узле, в то время как другой все еще работает. Анализ предварительно настроен для запуска на одном из запрошенных узлов перед началом любого фактического анализа.
def safeABQrun(*list_args):
import os
inpFiles,paths,jobVars = list_args
nA = len(inpFiles)
for k in range(0,nA):
args = (inpFiles[k],paths[k],jobVars[k])
try:
runABQfile(*args) # Actual Run Function
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes
for f in as_completed(futures):
print("|=== Finish Process Train %d ===|" % futures[f])
if f.exception() is not None:
print('%r generated an exception: %s' % (futures[f], f.exception()))