Используя concurrent.futures.ProcessPoolExecutor, я пытаюсь запустить первый фрагмент кода для параллельного выполнения функции «Calculate_Forex_Data_Derivatives (data, grid_spacing)». При вызове результатов executor_list[i].result() я получаю «BrokenProcessPool: процесс в пуле процессов был внезапно завершен, пока будущее выполнялось или ожидалось». Я попытался запустить код, отправляющий несколько вызовов функции в пул обработки, а также запускающий код, отправляющий только один вызов в пул обработки, что привело к ошибке.
Я также проверил структуру кода с помощью более простого фрагмента кода (предоставлен второй код) с теми же типами ввода для функции вызова, и он отлично работает. Единственное отличие, которое я вижу между двумя фрагментами кода, это то, что первый код вызывает функцию «FinDiff (ось, сетка_промежуток, производный_порядок)» из модуля «findiff». Эта функция вместе с «Calculate_Forex_Data_Derivatives(data,gride_spacing)» отлично работает сама по себе при обычном последовательном выполнении.
Я использую среду Anaconda, редактор Spyder и Windows.
Любая помощь будет оценена по достоинству.
#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."
import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures
def Calculate_Forex_Data_Derivatives(forex_data,dt): #function to run in parallel
try:
dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
except IndexError:
dClose_dt = np.nan
try:
d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
except IndexError:
d2Close_dt2 = np.nan
try:
d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
except IndexError:
d3Close_dt3 = np.nan
return dClose_dt, d2Close_dt2, d3Close_dt3
#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays
input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):
input_1.append(forex_data['Close'].values[:forex_data_index+1])
input_2.append(dt[:forex_data_index+1])
def multi_processing():
executors_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for index in range(len(input_1)):
executors_list.append(executor.submit(Calculate_Forex_Data_Derivatives,input_1[index],input_2[index]))
return executors_list
if __name__ == '__main__':
print('calculating derivatives')
executors_list = multi_processing()
for output in executors_list
print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."
##############################################################
#simple example that runs fine
def function(x,y): #function to run in parallel
try:
asdf
except NameError:
a = (x*y)[0]
b = (x+y)[0]
return a,b
x=[np.array([0,1,2]),np.array([3,4,5])] #function inputs, list of numpy arrays
y=[np.array([6,7,8]),np.array([9,10,11])]
def multi_processing():
executors_list = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for index,_ in enumerate(x):
executors_list.append(executor.submit(function,x[index],y[index]))
return executors_list
if __name__ == '__main__':
executors_list = multi_processing()
for output in executors_list: #prints as expected
print(output.result()) #(0, 6)
#(27, 12)