Инициализация многопроцессного пула с последовательным аргументом инициализатора

У меня есть код, подобный следующему:

import multiprocessing as mp

connection: module.Connection

def client_id():
    for i in range(mp.cpu_count*2):
        yield i

def initproc(host: str, port: int, client_id: int):
    global connection
    connection.connect(host, port, client_id)

def main():
    host = "something"
    port = 12345
    mp.get_context("spawn").Pool(processes=mp.cpu_count()*2,
                                 initializer=initproc,
                                 initargs=(host, port, client_id())) as p:
        res = p.starmap(processing_function, arg_list)
    

для целей вопроса processing_function и arg_list не имеют значения.

Проблема в том, что я получаю сообщение об ошибке:

    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'generator' object

Есть ли способ создать процесс инициализации в пуле таким образом, чтобы один из аргументов для его инициализации был бы следующим числом в последовательности?

P.S. В коде, как написано, может быть возможно инициализировать все объекты соединения за пределами функции инициализатора, но в моем конкретном случае это не так. Мне нужно передать аргументы для подключения в инициализатор.


person Karlson    schedule 24.08.2020    source источник
comment
Есть несколько вещей, на которые вы должны обратить внимание, потому что они, скорее всего, не делают того, что вы думаете. global и multiprocessing не смешиваются — у вас будет новый connection в каждом процессе. initproc говорит, что ожидает client_id: int, но вместо этого вы передаете client_id: Iterable[int] (поскольку client_id() является генератором). Наконец, что вы собираетесь делать в конце as p?   -  person MisterMiyagi    schedule 24.08.2020
comment
as p позволяет обращаться к объекту пула как к переменной p. global необходим, потому что в противном случае переменная connection будет локальной внутри функции и не будет доступна из функций обработки данных. Iterable или нет, я даже не дошел до того, что initproc получает значение для обработки   -  person Karlson    schedule 24.08.2020


Ответы (1)


Простым решением для вашего случая будет использование порядкового номера дочернего процесса, который содержится в файле Process.name. Вы можете извлечь его с помощью...

mp.current_process().name.split('-')[1]

Если вам нужно больше контроля над тем, где начинается последовательность, вы можете использовать multiprocessing.Value в качестве счетчика, из которого рабочие получают свой уникальный номер.

import multiprocessing as mp
import time


def init_p(client_id):
    with client_id.get_lock():
        globals()['client_id'] = client_id.value
        print(f"{mp.current_process().name},"
              f" {mp.current_process().name.split('-')[1]},"  # alternative
              f" client_id:{globals()['client_id']}")
        client_id.value += 1


if __name__ == "__main__":

    ctx = mp.get_context("spawn")
    client_ids = ctx.Value('i', 0)

    with ctx.Pool(
            processes=4,
            initializer=init_p,
            initargs=(client_ids,)
    ) as pool:

        time.sleep(3)

Выход:

SpawnPoolWorker-2, 2, client_id:0
SpawnPoolWorker-3, 3, client_id:1
SpawnPoolWorker-1, 1, client_id:2
SpawnPoolWorker-4, 4, client_id:3

Process finished with exit code 0
person Darkonaut    schedule 24.08.2020