Ошибка многопроцессорности Python с методами класса

Я пишу программу, в которой у меня есть объектно-ориентированный код, в котором я пытаюсь выполнять многопроцессорную обработку. Я получал ошибки рассола, потому что по умолчанию python может сериализовать функции, но не методы классов. Поэтому я использовал предложение Не могу pickle ‹введите 'instancemethod'› при использовании многопроцессорной функции Pool.map () в Python, но проблема в том, что если у меня есть лямбда-выражения внутри моих методов, это не работает. Мой пример кода выглядит следующим образом:

import numpy as np

from copy_reg import pickle
from types import MethodType
from multiprocessing.pool import ApplyResult
from _functools import partial
from _collections import defaultdict


class test(object):
    def __init__(self,words):
        self.words=words
#         self.testLambda = defaultdict(lambda : 1.)

    def parallel_function(self,f):
        def easy_parallize(f,sequence):
            from multiprocessing import Pool
            pool = Pool(processes=50) # depends on available cores
            result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
            cleaned = [x for x in result if not x is None] # getting results
            cleaned = np.asarray(cleaned)
            pool.close() # not optimal! but easy
            pool.join()
            return cleaned
        from functools import partial


        return partial(easy_parallize, f)

    def dummy(self):
        self.t=defaultdict(lambda:1.)

    def test(self,a,b,x):
        print x
        print a
        return x*x

    def testit(self):
        sequence=[1,2,3,4,5]
        f1=partial(self.test,'a','b')
        f_p=self.parallel_function(f1)
        results=f_p(sequence)


def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)



if __name__ ==   "__main__":
    pickle(MethodType, _pickle_method, _unpickle_method)
    t=test('fdfs')
    t.dummy()
    t.testit()

Но я получаю следующую ошибку из-за лямбда-выражения:

Traceback (most recent call last):
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 76, in <module>
    t.testit()
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 51, in testit
    results=f_p(sequence)
  File "/home/ngoyal/work/nlp_source/language-change/test.py", line 28, in easy_parallize
    result = pool.map(f, sequence) # for i in sequence: result[i] = f(i)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Есть ли какой-нибудь простой способ решить эту проблему, не переходя к какой-либо другой упаковке, в которой используется укроп или что-то в этом роде? Можно ли это сделать с помощью обычных библиотек Python? (Я использую Python 2.7)


person Naman    schedule 05.03.2015    source источник


Ответы (2)


Если вы посмотрите дальше по опубликованной вами ссылке… на мой ответ (https://stackoverflow.com/a/21345273/2379433), вы увидите, что действительно можете делать то, что хотите… даже если вы используете лямбда-выражения, словари по умолчанию и всевозможные другие конструкции python. Все, что вам нужно сделать, это заменить multiprocessing на pathos.multiprocessing… и все заработает. Заметьте, я даже работаю в интерпретаторе.

>>> import numpy as np
>>> from functools import partial
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> from collections import defaultdict
>>> 
>>> class test(object):
...   def __init__(self, words):
...     self.words = words
...   def parallel_function(self, f):
...     def easy_parallelize(f, sequence):
...       p = Pool()
...       result = p.map(f, sequence)
...       cleaned = [x for x in result if not x is None]
...       cleaned = np.asarray(cleaned)
...       return cleaned
...     return partial(easy_parallelize, f)
...   def dummy(self):
...     self.t = defaultdict(lambda: 1.)
...   def test(self, a, b, x):
...     print x
...     print a
...     print x*x
...   def testit(self):
...     sequence = [1,2,3,4,5]
...     f1 = partial(self.test, 'a','b')
...     f_p = self.parallel_function(f1)
...     results = f_p(sequence)
...     return results
... 
>>> t = test('fdfs')
>>> t.dummy()
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
array([], dtype=float64)

«Это работает», потому что pathos использует dill, сериализатор, который может обрабатывать почти все в Python. Вы даже можете динамически заменить метод, и он по-прежнему работает.

>>> def parallel_funtion(self, f):
...   def easy_parallelize(f, sequence):
...     p = Pool()
...     return p.map(f, sequence)
...   return partial(easy_parallelize, f)
... 
>>> test.parallel_function = parallel_funtion 
>>> 
>>> t.testit()
1
a
1
2
a
4
3
a
9
4
a
16
5
a
25
[None, None, None, None, None]

Получите pathos и dill здесь: https://github.com/uqfoundation

person Mike McKerns    schedule 05.03.2015
comment
Хотя в идеале я бы хотел сделать это со стандартным дистрибутивом, но поскольку, если это невозможно, и использование пафоса кажется таким простым, я обязательно попробую. Большое спасибо за отличный ответ. Я очень надеюсь, что когда-нибудь укроп заменит pickle в стандартном дистрибутиве Python. - person Naman; 06.03.2015
comment
Извините, что комментирую так поздно, но я хочу знать, есть ли какой-нибудь единый пакет пафоса, который я могу установить из одного tar со всеми зависимостями, такими как dill, pyina и pox? - person Naman; 13.03.2015
comment
@Naman: нет, дистрибутива, содержащего все зависимости, не существует. Однако вы можете установить версии на github с помощью setuptools или с помощью pip, немного поработав. Новый выпуск немного просрочен, но скоро должен быть доступен. - person Mike McKerns; 13.03.2015
comment
Спасибо за ответ и помощь. Было бы здорово получить новый релиз pip. - person Naman; 13.03.2015

Модуль pickle не может сериализовать лямбда-функции, потому что все они имеют одинаковое имя (<lambda>). Просто используйте обычную функцию, и она должна работать.

person cdonts    schedule 05.03.2015