Управление многопроцессорными процессами Python с разным использованием памяти

Я использую простую очередь RabbitMQ для распределения задач по рабочим процессам. Каждый рабочий процесс использует пул из multiprocessing экземпляров для одновременной работы над несколькими задачами, чтобы максимально использовать память и ЦП.

Проблема в том, что некоторые задачи занимают гораздо больше оперативной памяти, чем другие, так что рабочий процесс рухнет, если он запустит более одного экземпляра. Но пока рабочий работает над задачей с интенсивным использованием ОЗУ, я бы хотел, чтобы он работал над другими задачами с меньшим объемом ОЗУ, чтобы использовать остальные процессоры.

Одной из идей было бы использование нескольких очередей или тем, но мне интересно, каков рекомендуемый подход. Могу ли я отловить ошибки нехватки памяти до того, как они приведут к сбою процесса?

Каким будет правильный подход к решению этой проблемы?

[обновленное обновление]

Там вся система будет состоять из нескольких многоядерных машин, но на каждой многоядерной машине работает только одна рабочая программа, которая создает столько многопроцессорных экземпляров, сколько ядер. Разные машины должны быть независимы друг от друга, за исключением того, что они получают свои задачи из одной и той же очереди.


person Framester    schedule 28.07.2014    source источник
comment
Это все делается на одной машине?   -  person dano    schedule 28.07.2014
comment
Да, все на одной машине.   -  person Framester    schedule 29.07.2014
comment
Сколько у него ядер? У вас есть несколько рабочих процессов, каждый из которых создает несколько процессов внутри multiprocessing.Pool. Это звучит как много тотальных процессов. Если у вас не столько ядер, сколько процессов, вы только замедляетесь (и тратите память). Кроме того, зачем иметь несколько рабочих процессов, если все они потребляют из одной и той же очереди, а затем отправляют работу еще одному рабочему процессу, который является частью Pool? Почему бы не иметь один рабочий процесс, управляющий одним Pool?   -  person dano    schedule 29.07.2014
comment
Привет, Дано, спасибо за интерес. На каждой машине есть один рабочий, который создает столько многопроцессорных экземпляров, сколько ядер.   -  person Framester    schedule 29.07.2014
comment
Я думал, вы сказали, что есть только одна машина?   -  person dano    schedule 29.07.2014
comment
Привет, извините, я был не точен. Есть несколько машин, но они должны быть независимы друг от друга. По этой причине я изначально описал, как система должна работать на одной машине.   -  person Framester    schedule 30.07.2014
comment
Спасибо, это делает вашу архитектуру более понятной :)   -  person dano    schedule 30.07.2014


Ответы (1)


Я думаю, что пытаться поймать и исправить ошибки OOM будет очень сложно, если не невозможно. Вам понадобится работающий поток или процесс, который постоянно отслеживает использование памяти, и когда он обнаруживает, что она слишком высока, делает... что именно? Убивает процесс, обрабатывающий задачу? пытается приостановить его (если это возможно; это может не зависеть от того, что делают ваши задачи). Даже в этом случае приостановка не освободит память. Вам придется освободить память и перезапустить задачу, когда она будет безопасна, а это значит, что вам придется повторно поставить ее в очередь, решить, когда она безопасна, и т. д.

Вместо того, чтобы пытаться обнаружить проблему и устранить ее, я бы рекомендовал вообще избегать ее. Создайте две очереди и два пула. Одна очередь/пул для задач с большим объемом памяти и другая очередь/пул для задач с малым объемом памяти. В пуле с большой памятью будет только один процесс, поэтому он будет ограничен одновременным выполнением одной задачи, что экономит вашу память. Очередь с малым объемом памяти будет иметь multiprocessing.cpu_count() - 1 процессов, что позволит вам поддерживать насыщение ЦП в двух пулах.

Одна потенциальная проблема с этим подходом заключается в том, что если вы исчерпаете очередь с большим объемом памяти, но все еще имеете ожидающие выполнения задачи с низким объемом памяти, вы будете тратить впустую один из ваших ЦП. Вы можете справиться с этим потреблением из очереди с большим объемом памяти неблокирующим способом (или с тайм-аутом), так что, если очередь с большим объемом памяти пуста, когда вы готовы использовать задачу, вы можете получить меньший объем памяти. вместо этого задача памяти. Затем, когда вы закончите его обработку, снова проверьте очередь высокой памяти.

Что-то вроде этого:

import multiprocessing

# hi_q and lo_q are placeholders for whatever library you're using to consume from RabbitMQ

def high_mem_consume():
    while True:
       task = hi_q.consume(timeout=2)
       if not task:
          lo_q.consume(timeout=2)
       if task:
           process_task(task)


def low_mem_consume():
    while True:
        task = lo_q.consume()  # Blocks forever
        process_task(task)


if __name__ == "__main__":

    hi_pool = multiprocessing.Pool(1)
    lo_pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)
    hi_pool.apply_async(high_mem_consume)
    lo_pool.apply_async(lo_mem_consume)
person dano    schedule 30.07.2014