Как исправить BrokenProcessPool: ошибка для concurrent.futures ProcessPoolExecutor

Используя concurrent.futures.ProcessPoolExecutor, я пытаюсь запустить первый фрагмент кода для параллельного выполнения функции «Calculate_Forex_Data_Derivatives (data, grid_spacing)». При вызове результатов executor_list[i].result() я получаю «BrokenProcessPool: процесс в пуле процессов был внезапно завершен, пока будущее выполнялось или ожидалось». Я попытался запустить код, отправляющий несколько вызовов функции в пул обработки, а также запускающий код, отправляющий только один вызов в пул обработки, что привело к ошибке.

Я также проверил структуру кода с помощью более простого фрагмента кода (предоставлен второй код) с теми же типами ввода для функции вызова, и он отлично работает. Единственное отличие, которое я вижу между двумя фрагментами кода, это то, что первый код вызывает функцию «FinDiff (ось, сетка_промежуток, производный_порядок)» из модуля «findiff». Эта функция вместе с «Calculate_Forex_Data_Derivatives(data,gride_spacing)» отлично работает сама по себе при обычном последовательном выполнении.

Я использую среду Anaconda, редактор Spyder и Windows.

Любая помощь будет оценена по достоинству.

#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."

import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures

def Calculate_Forex_Data_Derivatives(forex_data,dt):  #function to run in parallel
    try:
        dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
    except IndexError:
        dClose_dt = np.nan

    try:   
        d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
    except IndexError:
        d2Close_dt2 = np.nan

    try:
        d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
    except IndexError:
        d3Close_dt3 = np.nan

    return dClose_dt, d2Close_dt2, d3Close_dt3

#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays

input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):
    input_1.append(forex_data['Close'].values[:forex_data_index+1])
    input_2.append(dt[:forex_data_index+1])


def multi_processing():
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index in range(len(input_1)):
            executors_list.append(executor.submit(Calculate_Forex_Data_Derivatives,input_1[index],input_2[index]))

    return executors_list

if __name__ == '__main__':
    print('calculating derivatives')
    executors_list = multi_processing()

for output in executors_list
    print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."


##############################################################


#simple example that runs fine

def function(x,y):  #function to run in parallel
    try:
        asdf
    except NameError:
        a = (x*y)[0]
        b = (x+y)[0]

    return  a,b

x=[np.array([0,1,2]),np.array([3,4,5])]    #function inputs, list of numpy arrays
y=[np.array([6,7,8]),np.array([9,10,11])]

def multi_processing():    
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index,_ in enumerate(x):
            executors_list.append(executor.submit(function,x[index],y[index]))

    return executors_list

if __name__ == '__main__':
    executors_list = multi_processing()

for output in executors_list:   #prints as expected
    print(output.result())      #(0, 6)
                                #(27, 12)

person ZachV    schedule 14.07.2019    source источник


Ответы (2)


Я знаю три типичных способа сломать конвейер ProcessPoolExecutor:

Уничтожение/прекращение работы ОС

Ваша система сталкивается с ограничениями, скорее всего, с памятью, и начинает убивать процессы. Поскольку форк в Windows клонирует содержимое вашей памяти, это вполне вероятно при работе с большими кадрами данных.

Как определить

  • Проверьте потребление памяти в диспетчере задач.
  • Если ваши DataFrames не занимают половину вашей памяти, они должны исчезнуть с max_workers=1, однако это не однозначно.

Самоувольнение работника

Экземпляр подпроцесса Python завершается из-за некоторой ошибки, которая не вызывает надлежащего исключения. Одним из примеров может быть segfault в импортированном C-модуле.

Как определить

Поскольку ваш код работает правильно без PPE, единственный сценарий, о котором я могу думать, - это если какой-то модуль не является многопроцессорным. Затем у него также есть шанс исчезнуть вместе с max_workers=1. Также может быть возможно вызвать ошибку в основном процессе, вызвав функцию вручную сразу после создания рабочих процессов (строка после цикла for, которая вызывает executor.submit. В противном случае это может быть очень сложно идентифицировать, но, на мой взгляд, это самый маловероятный случай.

Исключение в Кодексе СИЗ

Сторона подпроцесса канала (то есть код, обрабатывающий связь) может дать сбой, что приведет к правильному исключению, которое, к сожалению, не может быть передано главному процессу.

Как определить

Поскольку код (надеюсь) хорошо протестирован, главный подозреваемый лежит в возвращаемых данных. Его нужно замариновать и отправить обратно через сокет — оба шага могут дать сбой. Итак, вы должны проверить:

  • возвращаемые данные можно выбрать?
  • Достаточно ли мал маринованный объект для отправки (около 2 ГБ)?

Таким образом, вы можете либо попытаться вернуть некоторые простые фиктивные данные, либо явно проверить два условия:

    if len(pickle.dumps((dClose_dt, d2Close_dt2, d3Close_dt3))) > 2 * 10 ** 9: 
        raise RuntimeError('return data can not be sent!')

В Python 3.7 эта проблема исправлена, и он возвращает исключение.

person Matthias Huschle    schedule 15.07.2019
comment
Спасибо за отзыв. Перепробовал все предложенные вами методы диагностики. Я пытался работать с одним ядром и вводить массивы numpy только с одним элементом, поэтому я знаю, что это не проблема с передачей слишком большого количества данных. Я все еще думаю, что это проблема с модулем «findiff», потому что импорт стороннего модуля — это действительно единственное, что отличается от моего сломанного кода и простого примера, который работает. Я попытался импортировать модуль в разные места кода и функций, а также определить функцию, которая использует модуль в разных местах. Я застрял и мог бы начать искать другие маршруты. - person ZachV; 15.07.2019

Я нашел это в официальных документах:

Модуль main должен быть доступен для импорта рабочими подпроцессами. Это означает, что ProcessPoolExecutor не будет работать в интерактивном интерпретаторе. Вызов методов Executor или Future из вызываемого объекта, отправленного в ProcessPoolExecutor, приведет к взаимоблокировке.

Вы когда-нибудь пробовали это? Для меня работает следующее:

if __name__ == '__main__':
     executors_list = multi_processing()
     for output in executors_list:
         print(output.result())
person 莫昌钦    schedule 27.02.2021