явный switch() с gevent

У меня есть примитивный сценарий производителя/потребителя, работающий в gevent. Он запускает несколько функций-производителей, которые помещают объекты в gevent.queue.Queue, и одну функцию-потребитель, которая снова извлекает их из очереди:

from __future__ import print_function

import time

import gevent
import gevent.queue
import gevent.monkey

q = gevent.queue.Queue()

# define and spawn a consumer
def consumer():
    while True:
        item = q.get(block=True)
        print('consumer got {}'.format(item))

consumer_greenlet = gevent.spawn(consumer)

# define and spawn a few producers
def producer(ID):
    while True:
        print("producer {} about to put".format(ID))
        q.put('something from {}'.format(ID))
        time.sleep(0.1)
#       consumer_greenlet.switch()      

producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]

# wait indefinitely
gevent.monkey.patch_all()
print("about to join")
consumer_greenlet.join()

Он отлично работает, если я позволяю gevent обрабатывать планирование неявно (например, вызывая time.sleep или какую-либо другую функцию gevent.monkey.patch()ed), однако, когда я переключаюсь на потребителя явно (заменяю time.sleep закомментированным вызовом switch), gevent вызывает AssertionError:

Traceback (most recent call last):
  File "/my/virtualenvs/venv/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs)
  File "switch_test.py", line 14, in consumer
    item = q.get(block=True)
  File "/my/virtualenvs/venv/lib/python2.7/site-packages/gevent/queue.py", line 201, in get
    assert result is waiter, 'Invalid switch into Queue.get: %r' % (result, )
AssertionError: Invalid switch into Queue.get: ()
<Greenlet at 0x7fde6fa6c870: consumer> failed with AssertionError

Я хотел бы использовать явное переключение, потому что в производстве у меня много производителей, планирование gevent не выделяет достаточно времени выполнения потребителю, а очередь становится все длиннее и длиннее (что плохо). В качестве альтернативы приветствуется любое понимание того, как настроить или изменить планировщик gevent.

Это на Python 2.7.2, gevent 1.0.1 и greenlet 0.4.5.


person Simon    schedule 03.08.2015    source источник
comment
Может быть, вы могли бы посмотреть на размер очереди и сделать паузу в производстве, если она превышает определенный размер? Похоже, основная проблема заключается в том, что производители (если их не контролировать) производят намного больше, чем может выдержать потребитель? Почему вы используете для этого gevent, а не потоки или многопроцессорность?   -  person Tom Dalton    schedule 03.08.2015
comment
На самом деле я уже реализовал предложенное вами ожидание, но это приводит к большому количеству ожиданий на стороне производителя, что приводит к снижению пропускной способности системы (и этой системе действительно нужно быстро выполнять задачи...). Потребитель, безусловно, может справиться с нагрузкой, производители загружают и анализируют URL-адреса, и все, что делает потребитель, — это записывает результаты в базу данных. Я использую gevent, потому что он уменьшает головную боль синхронизации, которую создают потоки, а также потому, что многопроцессорность потребляет значительный объем памяти, когда создается более нескольких десятков рабочих процессов.   -  person Simon    schedule 03.08.2015
comment
Я думаю, что я что-то упускаю здесь. Если производители могут производить больше, чем может выдержать потребитель, то, конечно же, у вас есть только один вариант — добавить больше потребителей или уменьшить объем производства производителей (что и делает ожидание). Я не уверен, что вы имеете в виду под снижением пропускной способности системы - не является ли узкое место ограничением того, насколько быстро потребитель может обработать очередь? Какие проблемы синхронизации создает многопоточность, а не gevent?   -  person Tom Dalton    schedule 03.08.2015
comment
Производители загружают URL-адреса из Интернета, ожидая ответа в среднем ~ 1 с, и помещают некоторые извлеченные данные в очередь (это занимает всего несколько мс). Потребитель фиксирует эти элементы данных в базе данных (это также занимает не более нескольких мс). Исходя из этих цифр, я должен иметь сотни производителей, не загружая ЦП, но gevent должен будет выделить половину процессорного времени потребителю, чего он не делает.   -  person Simon    schedule 03.08.2015
comment
Вот хорошее объяснение многопоточности и гринлетов: stackoverflow.com/questions/15556718/greenlet-vs-threads   -  person Simon    schedule 03.08.2015
comment
Этот поток не отвечает на мой вопрос о том, каких проблем с синхронизацией вы избегаете, используя gevent вместо потоковой передачи. Почему у вас есть потребность в настройке производителя/потребителя для каждого работника, получающего данные, а затем сохраняющего данные как единую «задачу»? Тогда вы сможете иметь столько рабочих, сколько захотите, без необходимости балансировать ресурсы производителя/потребителя.   -  person Tom Dalton    schedule 04.08.2015
comment
Я бы предпочел обсудить проблему, а не мой выбор дизайна, пожалуйста.   -  person Simon    schedule 04.08.2015


Ответы (1)


Мне кажется, что явное переключение не очень хорошо сочетается с неявным переключением. У вас уже есть неявное переключение либо из-за обезьяньего ввода-вывода, либо из-за того, что gevent.queue.Queue().

Документация gevent не рекомендует использовать необработанные методы гринлета:

Будучи подклассом greenlet, Greenlet также имеет методы switch() и throw(). Однако их не следует использовать на уровне приложений, поскольку они могут очень легко привести к гринлетам, которые никогда не будут запланированы. Вместо этого отдавайте предпочтение безопасным классам более высокого уровня, таким как Event и Queue.

Итерация gevent.queue.Queue() или доступ к методу get очереди выполняет неявное переключение, что интересно, put нет. Таким образом, вы должны сами сгенерировать неявный переключатель потока. Проще всего позвонить gevent.sleep(0) (на самом деле вам не нужно ждать определенное время).

В заключение вам даже не нужно заниматься обезьяньими вещами, при условии, что ваш код не имеет блокирующих операций ввода-вывода.

Я бы переписал ваш код так:

import gevent
import gevent.queue

q = gevent.queue.Queue()

# define and spawn a consumer
def consumer():
    for item in q:
        print('consumer got {}'.format(item))

consumer_greenlet = gevent.spawn(consumer)

# define and spawn a few producers
def producer(ID):
    print('producer started', ID)
    while True:
        print("producer {} about to put".format(ID))
        q.put('something from {}'.format(ID))
        gevent.sleep(0)

producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]
# wait indefinitely
print("about to join")
consumer_greenlet.join()
person sanyi    schedule 26.08.2015
comment
Вау, я полностью упустил из виду ваш ответ, спасибо! put() отсутствие неявного переключения - интересное наблюдение, я посмотрю на производственный код и посмотрю, изменит ли это что-то... - person Simon; 02.12.2015