Есть ли простая параллельная карта на основе процессов для python?

Я ищу простую параллельную карту на основе процессов для python, то есть функцию

parmap(function,[data])

который будет запускать функцию для каждого элемента [data] в другом процессе (ну, на другом ядре, но AFAIK, единственный способ запускать материал на разных ядрах в python - запустить несколько интерпретаторов) и вернуть список результатов .

Что-то подобное существует? Я хочу что-нибудь простое, поэтому простой модуль был бы неплохим. Конечно, если такого не существует, я довольствуюсь большой библиотекой: - /


person static_rtti    schedule 09.11.2009    source источник


Ответы (5)


Мне кажется, что вам нужен метод в многопроцессорной обработке. Пул ():

map (func, iterable [, chunksize])

A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.

This method chops the iterable into a number of chunks which it submits to the 
process pool as separate tasks. The (approximate) size of these chunks can be 
specified by setting chunksize to a positive integ

Например, если вы хотите отобразить эту функцию:

def f(x):
    return x**2

в range (10), вы можете сделать это с помощью встроенной функции map ():

map(f, range(10))

или используя метод map () объекта multiprocessing.Pool ():

import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))
person Flávio Amieiro    schedule 09.11.2009
comment
Если вы вызываете это из долгоживущей программы, обязательно вызовите pool.close (в идеале в блоке finally включающего try/finally). В противном случае пул может не очистить дочерние процессы, и вы можете получить зомби-процессы. См. bugs.python.org/issue19675. - person rogueleaderr; 02.11.2016
comment
@rogueleaderr Разве не было бы более идиоматично использовать with? - person CodeMonkey; 15.12.2016
comment
Хороший момент @CodeMonkey! В первом примере в официальных документах используется with, поэтому очистка должна выполняться хорошо . - person rogueleaderr; 15.12.2016
comment
PicklingError: Can't pickle <function <lambda> at 0x121572bf8>: attribute lookup <lambda> on __main__ failed почему он не может работать lambda? - person O.rka; 22.04.2017
comment
Я нашел здесь действительно хороший пример, немного более сложный, чем предыдущий: blog.adeel.io/2016/11/06/parallelize-pandas-map-or-apply - person Rafael Valero; 21.08.2018

Это можно сделать элегантно с помощью Ray, системы, которая позволяет легко распараллеливать и распространять код Python. .

Чтобы распараллелить ваш пример, вам нужно определить функцию карты с помощью декоратора @ray.remote, а затем вызвать ее с помощью .remote. Это гарантирует, что каждый экземпляр удаленной функции будет выполняться в другом процессе.

import time
import ray

ray.init()

# Define the function you want to apply map on, as remote function. 
@ray.remote
def f(x):
    # Do some work...
    time.sleep(1)
    return x*x

# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e., 
# an identifier of the result) rather than the result itself.  
def parmap(f, list):
    return [f.remote(x) for x in list]

# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))

# Get the results
results = ray.get(result_ids)
print(results)

Это напечатает:

[1, 4, 9, 16, 25]

и он закончится примерно через len(list)/p (округленное до ближайшего целого числа), где p - количество ядер на вашем компьютере. Предполагая, что машина с 2 ядрами, наш пример будет выполняться за 5/2 с округлением, то есть примерно за 3 секунд.

Использование Ray дает ряд преимуществ перед модулем multiprocessing. В частности, один и тот же код будет работать как на отдельном компьютере, так и на кластере машин. Дополнительные преимущества Ray см. В этой связанной публикации.

person Ion Stoica    schedule 06.02.2019

Для тех, кто ищет Python-эквивалент R mclapply (), вот моя реализация. Это улучшение следующих двух примеров:

Его можно применять к функциям карты с одним или несколькими аргументами.

import numpy as np, pandas as pd
from scipy import sparse
import functools, multiprocessing
from multiprocessing import Pool

num_cores = multiprocessing.cpu_count()

def parallelize_dataframe(df, func, U=None, V=None):

    #blockSize = 5000
    num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )
    blocks = np.array_split(df, num_partitions)

    pool = Pool(num_cores)
    if V is not None and U is not None:
        # apply func with multiple arguments to dataframe (i.e. involves multiple columns)
        df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))
    else:
        # apply func with one argument to dataframe (i.e. involves single column)
        df = pd.concat(pool.map(func, blocks))

    pool.close()
    pool.join()

    return df

def square(x):
    return x**2

def test_func(data):
    print("Process working on: ", data.shape)
    data["squareV"] = data["testV"].apply(square)
    return data

def vecProd(row, U, V):
    return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )

def mProd_func(data, U, V):
    data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )
    return data

def generate_simulated_data():

    N, D, nnz, K = [302, 184, 5000, 5]
    I = np.random.choice(N, size=nnz, replace=True)
    J = np.random.choice(D, size=nnz, replace=True)
    vals = np.random.sample(nnz)

    sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])

    # Generate parameters U and V which could be used to reconstruct the matrix Y
    U = np.random.sample(N*K).reshape([N,K])
    V = np.random.sample(D*K).reshape([D,K])

    return sparseY, U, V

def main():
    Y, U, V = generate_simulated_data()

    # find row, column indices and obvseved values for sparse matrix Y
    (testI, testJ, testV) = sparse.find(Y)

    colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]
    dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}

    obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)
    obsValDF["obsI"] = testI
    obsValDF["obsJ"] = testJ
    obsValDF["testV"] = testV
    obsValDF = obsValDF.astype(dtype=dtypes)

    print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))

    # calculate the square of testVals    
    obsValDF = parallelize_dataframe(obsValDF, test_func)

    # reconstruct prediction of testVals using parameters U and V
    obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)

    print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))
    print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])

if __name__ == '__main__':
    main()
person Good Will    schedule 21.01.2019

Я знаю, что это старый пост, но на всякий случай я написал инструмент, чтобы сделать этот супер-супер простой инструмент под названием parmapper (я на самом деле называю это parmap в моем использовании, но имя было взято).

Он выполняет множество операций по настройке и деконструкции процессов и добавляет множество функций. В грубом порядке важности

  • Может принимать лямбда и другие неприступные функции
  • Может применять звездную карту и другие подобные методы вызова, чтобы упростить непосредственное использование.
  • Может разделяться между потоками и / или процессами
  • Включает такие функции, как индикаторы выполнения

Это связано с небольшими затратами, но для большинства применений это незначительно.

Надеюсь, вы сочтете это полезным.

(Примечание: он, как и map в Python 3+, возвращает итерацию, поэтому, если вы ожидаете, что все результаты пройдут через нее немедленно, используйте list())

person Justin Winokur    schedule 10.05.2019

В классе Python3 Pool есть метод map (), и это все, что вам нужно для распараллеливания карты:

from multiprocessing import Pool

with Pool() as P:
    xtransList = P.map(some_func, a_list)

Использование with Pool() as P аналогично пулу процессов и будет выполнять каждый элемент в списке параллельно. Вы можете указать количество ядер:

with Pool(processes=4) as P:
person bresson    schedule 20.12.2020