gevent блокирует запрос сокета redis

ЦЕЛЬ: создать несколько обработчиков Greenlet для получения данных из Redis (вытащить из Redis, а затем поставить в очередь).

РАБОТАЮЩАЯ ОКРУЖАЮЩАЯ СРЕДА: Ubuntu 12.04 PYTHON ВЕРСИЯ: 2.7 GEVENT ВЕРСИЯ: 1.0 RC2 REDIS ВЕРСИЯ: 2.6.5 REDIS-PY ВЕРСИЯ: 2.7.1

from gevent import monkey; monkey.patch_all()
import gevent
from gevent.pool import Group
from gevent.queue import JoinableQueue
import redis

tasks = JoinableQueue()
task_group = Group()

def crawler():
    while True:
        if not tasks.empty():
            print tasks.get()
            gevent.sleep()

task_group.spawn(crawler)
redis_client = redis.Redis()
data = redis_client.lpop('test') #<----------Block here
tasks.put(data)

Попробуйте извлечь данные из Redis, но он заблокирован ... и никаких исключений не возникает ... просто заморозьте и удалите метод создания, он сработает ... я не понимаю, что произошло, помогите, пожалуйста! Спасибо!


person XIO    schedule 29.12.2012    source источник


Ответы (1)


gevent предоставляет совместные облегченные процессы (не потоки). Следствием этого является то, что когда у вас где-то есть бесконечный цикл, а планировщик никогда не запускается повторно, программа блокируется, используя 100% ядра ЦП.

В вашем примере проблема заключается в том, как вы определили цикл сканирования. Очевидно, у вас есть бесконечный цикл, когда задачи пусты. И поскольку вызов gevent.sleep (который будет выполнять необходимую операцию yield) вызывается только тогда, когда задачи не пусты, это означает, что планировщик никогда не входит повторно.

Кажется, что команда lpop блокируется, потому что соединение задерживается клиентом Redis. Последовательность событий такова:

  • создана рабочая группа; но гринлет пока не запланирован
  • redis_client построен, но еще не генерирует ввод-вывод, так как фактическое соединение задерживается
  • lpop называется; на этот раз соединение действительно необходимо, потому что клиент Redis должен ждать соединения и ответа на lpop; поэтому он уступает планировщику
  • планировщик активирует поискового робота
  • бесконечный цикл, так как очередь задач все еще пуста

Если вы поместите gevent.sleep() в сам цикл (после if), он будет работать лучше, но это все еще неэффективный способ реализации исключения из очереди. Что-то вроде этого было бы намного лучше:

def crawler():
    while True:
        x = tasks.get()
        try:
            print "Crawler: ",x
        finally:
            tasks.task_done()

Вызов get() блокирует обработчик, поэтому он позволяет избежать игры в пинг-понг между обработчиком и планировщиком, пока очередь пуста.

person Didier Spezia    schedule 29.12.2012
comment
Я не знаю, но я все еще остаюсь при своем ответе ;-) - person Didier Spezia; 17.01.2013