Реакция на отключение клиента с помощью бутылки и gevent.wsgi?

У меня есть небольшой асинхронный сервер, реализованный с использованием bottle и gevent.wsgi. Существует процедура, используемая для реализации длинного опроса, которая очень похожа на пример «обратных вызовов событий» в документация по бутылке:

def worker(body):
  msg = msgbus.recv()
  body.put(msg)
  body.put(StopIteration)

@route('/poll')
def poll():
  body = gevent.queue.Queue()
  worker = gevent.spawn(worker, body)
  return body

Здесь msgbus — это сокет ZMQ sub.

Все это прекрасно работает, но если клиент разорвет соединение, пока worker заблокирован на msgbus.recv(), эта задача гринлета будет висеть "вечно" (ну, пока не будет получено сообщение) и узнает об отключенном клиенте только при попытке отправить ответ.

Я могу использовать msgbus.poll(timeout=something), если я не хочу вечно блокироваться в ожидании сообщений ipc, но я все еще не могу обнаружить отключение клиента.

Что я хочу сделать, так это получить что-то вроде ссылки на клиентский сокет, чтобы я мог использовать его в каком-то цикле select или poll, или получить какое-то асинхронное уведомление внутри моего гринлета, но я Я не уверен, как выполнить любую из этих вещей с помощью этих фреймворков (bottle и gevent).

Есть ли способ получать уведомления об отключении клиента?


person larsks    schedule 21.11.2013    source источник


Ответы (1)


Ага! Переменная wsgi.input, по крайней мере, под gevent.wsgi, имеет член rfile, который является файлоподобным объектом. Похоже, это не требуется согласно спецификации WSGI, поэтому он может не работать с другими серверами.

Благодаря этому я смог изменить свой код, чтобы он выглядел примерно так:

def worker(body, rfile):
  poll = zmq.Poller()
  poll.register(msgbus)
  poll.register(rfile, zmq.POLLIN)

  while True:
    events = dict(poll.poll())

    if rfile.fileno() in events:
      # client disconnect!
      break

    if msgbus in events:
      msg = msgbus.recv()
      body.put(msg)
      break

    body.put(StopIteration)

@route('/poll')
def poll():
  rfile = bottle.request.environ['wsgi.input'].rfile
  body = gevent.queue.Queue()
  worker = gevent.spawn(worker, body, rfile)
  return body

И это отлично работает...

...кроме OpenShift, где вам придется использовать альтернативный интерфейс на порту 8000 с поддержкой веб-сокетов.

person larsks    schedule 21.11.2013