asyncio queue потребительская сопрограмма

У меня есть подкласс asyncio.Protocol, получающий данные с сервера. Я сохраняю эти данные (каждую строку, потому что это текст) в asyncio.Queue.

import asyncio

q = asyncio.Queue()

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        for message in data.decode().splitlines():
            yield q.put(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()

loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
                              '127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

Я хочу, чтобы другая сопрограмма отвечала за потребление данных в очереди и их обработку.

  • Это должно быть asyncio.Task?
  • Что делать, если очередь становится пустой из-за того, что в течение нескольких секунд данные не принимаются? Как я могу убедиться, что мой потребитель не остановится (run_until_complete)?
  • Есть ли более чистый способ, чем использование глобальной переменной для моей очереди?

person valentin    schedule 01.02.2016    source источник
comment
Ваш код неверен, извините: data_received должна быть обычной функцией, а не сопрограммой с yield внутри. Более того, asyncio.Queue требует yield from, а не только yield.   -  person Andrew Svetlov    schedule 01.02.2016
comment
Ах да. Я поместил это туда, не тестируя, просто чтобы дать представление о том, что я хотел сделать.   -  person valentin    schedule 01.02.2016


Ответы (1)


Это должен быть asyncio.Task?

Да, создайте его с помощью asyncio.ensure_future или loop.create_task.

Что делать, если очередь становится пустой из-за того, что в течение нескольких секунд данные не принимаются?

Просто используйте queue.get, чтобы дождаться товар доступен:

async def consume(queue):
    while True:
        item = await queue.get()
        print(item)

Есть ли более чистый способ, чем использование глобальной переменной для моей очереди?

Да, просто передайте его как аргумент сопрограмме потребителя и протоколу потока:

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop, queue):
        self.loop = loop
        self.queue = queue

    def data_received(self, data):
        for message in data.decode().splitlines():
            self.queue.put_nowait(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()

Как я могу убедиться, что мой потребитель не остановится (run_until_complete)?

После закрытия соединения используйте queue.join ждать, пока очередь не опустеет.


Полный пример:

loop = asyncio.get_event_loop()
queue = asyncio.Queue()
# Connection coroutine
factory = lambda: StreamProtocol(loop, queue)
connection = loop.create_connection(factory, '127.0.0.1', '42')
# Consumer task
consumer = asyncio.ensure_future(consume(queue))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the loop
loop.close()

В качестве альтернативы вы также можете использовать потоки:

async def tcp_client(host, port, loop=None):
    reader, writer = await asyncio.open_connection(host, port, loop=loop)
    async for line in reader:
        print(line.rstrip())
    writer.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_client('127.0.0.1', 42, loop))
loop.close()
person Vincent    schedule 01.02.2016
comment
Спасибо! Похоже, правильный способ это сделать. Я думаю, что есть проблема с вашим полным примером, переменная coro не существует - person valentin; 01.02.2016
comment
@toogy Верно, только что поправил. - person Vincent; 01.02.2016
comment
Идеально. И последнее. Что, если я хочу, чтобы мой потребитель был чем-то большим, чем функция (я имею в виду класс)? Должен ли я просто унаследовать класс asyncio.Task? - person valentin; 01.02.2016
comment
@toggy Нет, просто пусть ваш класс определяет сопрограммы, которые вы можете запланировать как задачу, используя asyncio.ensure_future. - person Vincent; 01.02.2016
comment
Еще раз спасибо Винсент - person valentin; 03.02.2016
comment
Могу я спросить, как, используя этот сценарий, можно было бы запустить коммуникацию как сервер в своем собственном потоке. Основной поток создает цикл событий recvQ; sendQ (очередь импорта) и передает их как аргументы в поток связи. Протокол TCP имеет всего 3 метода: connection_made; Данные получены; соединение потеряно. Я поднял вопрос 42949622, но не получил ответа. Основная задача должна ставить данные в очередь для отправки и получения данных после того, как метод data_received пересылает их (стандартная очередь помещает и получает оба конца). - person Cliff; 20.04.2017