Интеграция multiprocessing.Process с concurrent.future._base.Future

У меня есть требование создавать дочерние процессы, получать результаты с помощью Future, а затем при необходимости убивать некоторые из них.

Для этого я создал подкласс класса multiprocessing.Process и возвращаю объект Future из метода start().

Проблема в том, что я не могу получить результат в функции cb(), так как она никогда не вызывается.

Пожалуйста, помогите/предложите, можно ли это сделать каким-то другим способом или что-то, чего мне не хватает в моей текущей реализации?

Ниже приведен мой текущий подход

from multiprocessing import Process, Queue
from concurrent.futures import _base
import threading
from time import sleep


def foo(x,q):
    print('result {}'.format(x*x))
    result = x*x
    sleep(5)
    q.put(result)


class MyProcess(Process):

    def __init__(self, target, args):
        super().__init__()
        self.target = target
        self.args = args
        self.f = _base.Future()

    def run(self):
        q = Queue()
        worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,)))
        worker_thread.start()
        r = q.get(block=True)
        print('setting result {}'.format(r))
        self.f.set_result(result=r)
        print('done setting result')

    def start(self):
        f = _base.Future()
        run_thread = threading.Thread(target=self.run)
        run_thread.start()
        return f


def cb(future):
    print('received result in callback {}'.format(future))


def main():
    p1 = MyProcess(target=foo, args=(2,))
    f = p1.start()
    f.add_done_callback(fn=cb)

    sleep(10)


if __name__ == '__main__':

    main()

    print('Main thread dying')

person Majeed Khan    schedule 18.12.2017    source источник


Ответы (1)


В вашем методе запуска вы создаете новое будущее, которое затем возвращаете. Это другое будущее, чем то, на которое вы установили результат, это будущее просто не используется вообще. Пытаться:

def start(self):
    run_thread = threading.Thread(target=self.run)
    run_thread.start()
    return self.f

Однако с вашим кодом больше проблем. Вы переопределяете метод start процесса, заменяя его выполнением в рабочем потоке, тем самым фактически обходя многопроцессорность. Также вам не следует импортировать модуль _base, который является деталью реализации, как видно из начального подчеркивания. Вы должны импортировать concurrent.futures.Future (это тот же класс, но через публичный API).

Это действительно использует многопроцессорность:

from multiprocessing import Process, Queue
from concurrent.futures import Future
import threading
from time import sleep


def foo(x,q):
    print('result {}'.format(x*x))
    result = x*x
    sleep(5)
    q.put(result)

class MyProcess(Process):

    def __init__(self, target, args):
        super().__init__()
        self.target = target
        self.args = args
        self.f = Future()

    def run(self):
        q = Queue()
        worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,)))
        worker_thread.start()
        r = q.get(block=True)
        print('setting result {}'.format(r))
        self.f.set_result(result=r)
        print('done setting result')

def cb(future):
    print('received result in callback {}: {}'.format(future, future.result()))

def main():
    p1 = MyProcess(target=foo, args=(2,))
    p1.f.add_done_callback(fn=cb)
    p1.start()
    p1.join()
    sleep(10)

if __name__ == '__main__':
    main()
    print('Main thread dying')

И теперь вы уже находитесь в новом процессе, создание рабочего потока для выполнения вашей целевой функции на самом деле не должно быть необходимым, вместо этого вы можете просто выполнить свою целевую функцию напрямую. Если целевая функция вызовет исключение, о котором вы не знаете, ваш обратный вызов будет вызван только в случае успеха. Итак, если вы это исправите, у вас останется:

from multiprocessing import Process
from concurrent.futures import Future
import threading
from time import sleep


def foo(x):
    print('result {}'.format(x*x))
    result = x*x
    sleep(5)
    return result

class MyProcess(Process):

    def __init__(self, target, args):
        super().__init__()
        self.target = target
        self.args = args
        self.f = Future()

    def run(self):
        try:
            r = self.target(*self.args)
            print('setting result {}'.format(r))
            self.f.set_result(result=r)
            print('done setting result')
        except Exception as ex:
            self.f.set_exception(ex)

def cb(future):
    print('received result in callback {}: {}'.format(future, future.result()))

def main():
    p1 = MyProcess(target=foo, args=(2,))
    p1.f.add_done_callback(fn=cb)
    p1.start()
    p1.join()
    sleep(10)

if __name__ == '__main__':
    main()
    print('Main thread dying')

Это в основном то, что делает ProcessPoolExecutor.

person mata    schedule 18.12.2017
comment
Спасибо за ответ. Это была действительно глупая ошибка. Спасибо, что заметили! :) - person Majeed Khan; 18.12.2017