Как объединить несколько файлов после завершения многопроцессорной обработки в Python?

В моем коде многопроцессорный процесс используется для одновременного создания нескольких заданий impdp (импорта), и каждое задание создает файл журнала с динамическим именем:

'/DP_IMP_' + DP_PDB_FULL_NAME[i] + '' + DP_WORKLOAD + '' + str(vardate) + '.log'

vardate = datetime.now().strftime("%d-%b-%Y-%I_%M_%S_%p")
tempfiles = []
for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + DP_PDB_FULL_NAME[i] + '_' + DP_WORKLOAD  +  '_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload, args=(DP_WORKLOAD, DP_DURATION_SECONDS, vardate, ))
                 p1.start()

Я хочу объединить все файлы журналов, созданные в один большой основной файл журнала, после завершения всех процессов. Но когда я пытаюсь использовать что-то подобное в цикле (for i in range((len(DP_PDB_FULL_NAME))):

with open('DATAPUMP_IMP_' + str(vardate) + '.log','wb') as wfd:
    for f in tempfiles:
        with open(f,'rb') as fd:
            shutil.copyfileobj(fd, wfd)

затем он пытается записать файлы до завершения процессов.

Здесь DP_PDB_FULL_NAME — это список нескольких баз данных, поэтому несколько процессов порождаются одновременно в нескольких БД. Когда я пытаюсь добавить p1.join() после завершения цикла, многопроцессорность не происходит в нескольких БД.

Итак, как мне создать основной файл журнала после завершения всех отдельных процессов?


person Vaidehi    schedule 18.05.2021    source источник


Ответы (2)


Вы должны создать какую-то структуру, в которой вы храните необходимые переменные и дескрипторы процессов. Блокируйте с помощью join после этого цикла, пока все подпроцессы не будут готово, а затем работать с полученными файлами.

handles = []
for i in range(10):
    p = Process()
    p.start()
    handles.append(p)

for handle in handles:
    handle.join()

person potato_cannon    schedule 18.05.2021
comment
Когда я попытался добавить p1.join() сразу после p1.start(), многопроцессорность не работает. Где вы предлагаете мне добавить соединение? - person Vaidehi; 18.05.2021

Итак, я добавил p1.join() после окончания первого цикла, и теперь он работает!

vardate = datetime.now().strftime("%d-%b-%Y-%I_%M_%S_%p")
tempfiles = []
for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + DP_PDB_FULL_NAME[i] + '_' + DP_WORKLOAD  +  '_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload, args=(DP_WORKLOAD, DP_DURATION_SECONDS, vardate, ))
                 p1.start()
p1.join()

with open('DATAPUMP_IMP_' + str(vardate) + '.log','wb') as wfd:
    for f in tempfiles:
        with open(f,'rb') as fd:

Чтобы объяснить это далее, есть три случая добавления объединения в приведенную выше сцену, и многопроцессорность работает соответственно.

  1. Внутри самого внутреннего цикла for:

    Итак, если здесь добавить соединение, то многопроцессорность вообще не будет работать, потому что она сразу после proc.start()

for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload, args=(DP_WORKLOAD, ))
                 p1.start()
                 p1.join()
  1. Внутри внешнего цикла for (вне самого внутреннего цикла for)

    Здесь многопроцессорность работает только для внутреннего цикла, а не для нескольких БД.

for i in range((len(DP_PDB_FULL_NAME))):
        for DP_WORKLOAD in DP_WORKLOAD_NAME:
                 tempfiles.append(logdir + '/DP_IMP_' + str(vardate) + '.log')
                 p1 = multiprocessing.Process(target=imp_workload, args=(DP_WORKLOAD, ))
                 p1.start()
        p1.join()
  1. Вне внешнего цикла for

    Это правильное решение (упомянутое выше), когда оно находится вне всех циклов, использующих многопроцессорность.

person Vaidehi    schedule 18.05.2021
comment
Это работает, но есть одна важная проблема. В настоящее время вы всегда вызываете соединение только для последнего инициализированного подпроцесса. Если он завершится быстрее, чем предыдущие процессы, вы можете снова столкнуться с первоначальной проблемой — потерей данных. Я добавлю редактирование для вас - person potato_cannon; 27.05.2021
comment
@potato_cannon да, ты прав. Я столкнулся с точной проблемой. Итак, как вы предлагаете мне внести изменения? - person Vaidehi; 11.06.2021
comment
Я отредактировал принятый ответ для уточнения - person potato_cannon; 12.06.2021