Поведение многопроцессорной очереди Python

У меня есть этот код:

def f(x, y):
    def _f(x, y, queue):
        res = get_data(x, y)
        queue.put(res)

    q = Queue()
    p = Process(target=_f, args=(x, y, q))
    p.start()
    res = q.get()
    p.join()

    return res

Это работает отлично. Но когда я делаю res = q.get() после p.join() вот так:

q = Queue()
p = Process(target=_f, args=(x, y, q))
p.start()
p.join()
return q.get()

Для несколько больших данных (например, несколько сотен элементов массива JSON) он будет зависать на p.join().

Почему это?


person wiseodd    schedule 05.03.2016    source источник


Ответы (1)


Вероятно, вы попали в состояние гонки. Если вы захотите взглянуть на исходный код multiprocessing.Queue, вы заметите, что он использует multiprocessing.Pipe и threading.Thread для связи между экземплярами multiprocessing.Process. Итак, когда вы вызываете Queue.put, вы фактически просите вышеупомянутый поток записать объект в другой Process, используя Connection.send, созданный Pipe. Когда вы вызываете Queue.get, вы фактически потребляете данные, полученные через Connection.recv, которые составляют другой конец Pipe.

Теперь, если вы углубитесь в multiprocessing.Pipe и создаваемые им экземпляры Connection, вы обнаружите, что они используют низкоуровневые интерфейсы ОС для реализации большинства своих функций. Однако, используя такие интерфейсы, писатель должен соблюдать верхний предел того, сколько данных он может передать Connection.send без того, чтобы другой Process или Thread вызывал соответствующий Connection.recv.

В вашем конкретном случае Process для _f заканчивается, как только возвращается queue.put(res) (т. е. сразу после того, как вы «попросите» Thread написать res). Единственное, что мешает завершить Process, это ожидание Connection.send; после этого он свободен. Если объект небольшой и соответствует ограничениям ОС, Conneciton.send немедленно вернется, и весь процесс завершится. С другой стороны, если объект больше, чем ограничение ОС, Connection.send заблокируется, ожидая, пока Connection.recv освободит место для оставшихся данных. Однако этого никогда не происходит, потому что сразу после вызова Process.start вы вызываете Process.join, который будет блокироваться до тех пор, пока не закончится Process; концовка, которая произойдет только после того, как вы позвоните Connection.recv; вызов, который произойдет только после возврата Process.join; и мы замыкаем нашу тупиковую цепочку.

person PEdroArthur    schedule 25.09.2018