Joblib Parallel использует только одно ядро, если запущено из QThread

Я разрабатываю графический интерфейс, который выполняет тяжелую обработку чисел. Чтобы ускорить процесс, я использую параллельное выполнение joblib вместе с QThreads pyqt, чтобы графический интерфейс не переставал отвечать. Параллельное выполнение пока работает нормально, но если оно встроено в графический интерфейс и запускается в собственном потоке, оно использует только одно из четырех моих ядер. Что-нибудь фундаментальное, что я пропустил в мире потоковой / многопроцессорной обработки?

Вот примерный набросок моей установки:

 class ThreadRunner(QtCore.QObject):

    start = QtCore.pyqtSignal()
    result_finished = QtCore.pyqtSignal(np.ndarray)

    def __init__(self, function, *args, **kwargs):
        super(DispMapRunner, self).__init__()

        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.start.connect(self.run)

    def run(self):
        print "New Thread started"
        result = self.function(*self.args, **self.kwargs)
        self.result_finished.emit(result)

class Gui(QtGui.QMainWindow, form_class):
    def __init__(self, cl_args, parent=None):
        super(Gui, self).__init__()
        #other stuff

    def start_thread(self, fun, *args, **kwargs):
        self.runner = ThreadRunner(fun, *args, **kwargs)
        self.thread = QtCore.QThread() 
        self.runner.moveToThread(self.thread)
        # more stuff for catching results

def slice_producer(data):
    n_images, rows, cols = data.shape[:3]
    for r in range(rows):
        yield np.copy(data[:,r,...])

    def run_parallel(data, *args, **kwargs):
        results = joblib.Parallel(
                    n_jobs=4,
                    verbose=12,
                    pre_dispatch='1.5*n_jobs'
                    )
                    (
                    delayed(
                    memory.cache(do_long_computation))
                    (slice, **kwargs) for slice in slice_producer(data)
                    )   

Надеюсь, он не слишком длинный и в то же время слишком расплывчатый. Я использую pyqt4 4.11.3 и joblib 0.8.4.

Я снова проверил свой код и заметил следующее предупреждение:

UserWarning: Multiprocessing backed parallel loops cannot 
be nested below threads, setting n_jobs=1

Что уточняет мой вопрос до следующего: как запустить многопроцессорный процесс в отдельном потоке?


person mgutsche    schedule 31.08.2015    source источник
comment
Пробовали ли вы использовать простой многопроцессорный пул вместо joblib?   -  person ekhumoro    schedule 31.08.2015
comment
Я читал об этом, но мне было непонятно, как добиться неблокирующего выполнения моей функции. Результаты apply_async должны быть откуда-то извлечены, и графический интерфейс не должен ждать этого, а скорее в потоке, верно?   -  person mgutsche    schedule 01.09.2015
comment
Потоки на самом деле не нужны, поскольку вы можете создать настраиваемое событие в обратный вызов apply_async, а затем используйте postEvent, чтобы избежать блокировки графического интерфейса пользователя . Фактически, можно было бы использовать этот подход с joblib (с которым я совсем не знаком) и добиться аналогичных результатов.   -  person ekhumoro    schedule 01.09.2015


Ответы (1)


Хорошо, благодаря ekhumoro я пришел к тому, что работает, использует только экземпляр mp.pool и работает с обратными вызовами. Единственный недостаток заключается в том, что ошибки в дочернем процессе завершаются сбоем (например, изменение результатов приводит к появлению f_wrapper). Вот код для использования в будущем:

from PyQt4.QtCore import *
from PyQt4.QtGui import *
import multiprocessing
import sys
import numpy as np
import time

def f(data_slice, **kwargs):
    '''This is a time-intensive function, which we do not want to alter
    '''
    data = 0
    for row in range(data_slice.shape[0]):
        for col in range(data_slice.shape[1]):
            data += data_slice[row,col]**2
    time.sleep(0.1)
    return data, 3, 5, 3 # some dummy calculation results


def f_wrapper(row, data_slice,  **kwargs):
    results = f(data_slice, **kwargs)
    return row, results

class MainWindow(QMainWindow): #You can only add menus to QMainWindows

    def __init__(self):
        super(MainWindow, self).__init__()
        self.pool = multiprocessing.Pool(processes=4)

        button1 = QPushButton('Connect', self)
        button1.clicked.connect(self.apply_connection)
        self.text = QTextEdit()

        vbox1 = QVBoxLayout()
        vbox1.addWidget(button1)
        vbox1.addWidget(self.text)
        myframe = QFrame()
        myframe.setLayout(vbox1)

        self.setCentralWidget(myframe)
        self.show() #display and activate focus
        self.raise_()


    def apply_connection(self):
        self.rows_processed = list()
        self.max_size = 1000
        data = np.random.random(size = (100, self.max_size,self.max_size))
        kwargs = {'some_kwarg' : 1000}
        for row in range(data.shape[1]):
            slice = data[:,row, :]
            print "starting f for row ", row 
            result = self.pool.apply_async(f_wrapper, 
                                           args = (row, slice), 
                                           kwds = kwargs,
                                           callback=self.update_gui)
            #~ result.get() # blocks gui, but raises errors for debugging


    def update_gui(self, result):
        row, func_result = result
        self.rows_processed.append(row)
        print len(self.rows_processed)
        print func_result# or do something more intelligent
        self.text.append('Applied connection. Row = %d\n' % row)
        if len(self.rows_processed) == self.max_size:
            print "Done!" 




if __name__ == '__main__':
    app = QApplication(sys.argv)
    gui = MainWindow()
    app.exec_()

Если есть хороший способ фиксировать ошибки, это будет хорошим бонусом.

person mgutsche    schedule 03.09.2015