Очередь Python asyncio не обновляется

Следующий код создает экземпляр объекта asyncio.Queue и пытается заполнить и использовать эту очередь из двух разных сопрограмм, соответственно arrival() и server().

loop = asyncio.get_event_loop()
q = asyncio.Queue()

async def arrival(q):
    print('ARRIVAL - Queue id:', id(q))

    while True:
        await asyncio.sleep(1)
        item = random.choice(['item1', 'item2'..., 'item100'])
        q.put(item)

        print('ARRIVAL - added {}, qsize is now {}'.format(item, q.qsize()))


async def server(q):
    print('SERVER - Queue id:', id(q))

    while True:
        item = await q.get()
        print('SERVER - taking {}, qsize is now {}'.format(item, q.qsize()))

        await asyncio.sleep(1.8)
        print('SERVER - finished processing {}'.format(item))


tasks = [loop.create_task(arrival(q)), loop.create_task(server(q))]
loop.run_until_complete(asyncio.gather(*tasks))

Принцип следующий:

  • Каждую 1 секунду элемент добавляется в q
  • Каждый раз, когда сервер свободен, он берет следующий элемент в очереди или ждет его.
  • Серверу требуется 1,8 секунды для обработки элемента

Ожидаемый результат будет следующим:

SERVER - Queue id: 12345678
ARRIVAL - Queue id: 12345678
ARRIVAL - added item1, qsize is now 1
SERVER - taking item1, qsize is now 0
ARRIVAL - added item2, qsize is now 1
SERVER - finished processing item1
SERVER - taking item2, qsize is now 0
ARRIVAL - added item3, qsize is now 1
ARRIVAL - added item4, qsize is now 2
SERVER - finished processing item2
SERVER - taking item3, qsize is now 1
ARRIVAL - added item5, qsize is now 2
ARRIVAL - added item6, qsize is now 3
SERVER - finished processing item3
SERVER - taking item4, qsize is now 2

Однако, когда я запускаю приведенный выше код, элементы в цикле while True в server() никогда не выполняются, q.qsize() всегда равен 0, а вывод:

SERVER - Queue id: 12345678
ARRIVAL - Queue id: 12345678
ARRIVAL - added item1, qsize is now 0
ARRIVAL - added item2, qsize is now 0
ARRIVAL - added item3, qsize is now 0
ARRIVAL - added item4, qsize is now 0
ARRIVAL - added item5, qsize is now 0
...

Кажется, что объект q никогда не обновляется arrival() (q.qsize() всегда равен 0) и, следовательно, server() никогда не знает об элементах, добавленных arrival().


person Jivan    schedule 31.12.2016    source источник


Ответы (2)


я получил это так, как вы хотите:

import asyncio
import random

random.seed(31415)  # get reproducible runs

ITEMS = ['item{}'.format(i) for i in range(100)]

async def arrival(q):
    queue_object_id = id(q)
    print('ARRIVAL - Queue id:', queue_object_id)
    while True:
        await asyncio.sleep(1)
        item = random.choice(ITEMS)
        await q.put(item)
        size = q.qsize()
        print('ARRIVAL - added {}, qsize is now {}'.format(item, size))

async def server(q):
    queue_object_id = id(q)
    print('SERVER - Queue id:', queue_object_id)

    while True:
        item = await q.get()
        size = q.qsize()
        print('SERVER - taking {}, qsize is now {}'.format(item, size))
        await asyncio.sleep(1.8)
        print('SERVER - finished processing {}'.format(item))

loop = asyncio.get_event_loop()
q = asyncio.Queue()
cors = asyncio.wait([arrival(q), server(q)])
loop.run_until_complete(cors)

к сожалению, я не отследил все изменения, которые мне пришлось внести... извините. но я уверен, что вы узнаете различия и почему они имеют значение.

это производит вывод:

SERVER - Queue id: 140540011741592
ARRIVAL - Queue id: 140540011741592
ARRIVAL - added item75, qsize is now 1
SERVER - taking item75, qsize is now 0
ARRIVAL - added item36, qsize is now 1
SERVER - finished processing item75
SERVER - taking item36, qsize is now 0
ARRIVAL - added item57, qsize is now 1
ARRIVAL - added item5, qsize is now 2
SERVER - finished processing item36
SERVER - taking item57, qsize is now 1
ARRIVAL - added item69, qsize is now 2
ARRIVAL - added item67, qsize is now 3
SERVER - finished processing item57
SERVER - taking item5, qsize is now 2
ARRIVAL - added item53, qsize is now 3
ARRIVAL - added item16, qsize is now 4
SERVER - finished processing item5
SERVER - taking item69, qsize is now 3
ARRIVAL - added item91, qsize is now 4
...
person hiro protagonist    schedule 31.12.2016
comment
Спасибо! Значимым битом был вызов await q.put(item) вместо просто q.put(item). Имеет смысл, так как q.put — это сопрограмма, но я как-то пропустил это. - person Jivan; 31.12.2016
comment
@Jivan да, хотел добавить, что это была одна из самых важных вещей, которые я изменил ... извините. & с Новым Годом! - person hiro protagonist; 31.12.2016
comment
С другой стороны, замена asyncio.gather() на asyncio.wait(), похоже, не имеет никакого эффекта, по крайней мере, в этом случае. Я должен исследовать разницу в документах. С Новым Годом! - person Jivan; 31.12.2016

TL&DR: при использовании асинхронной очереди обязательно ждите как put(), так и get().

Рациональность: обе функции get() и put() в асинхронной очереди являются сопрограммами, поэтому их нужно ожидать.

Пример:

await q.put(item)
await q.get(item)
person FlyingV    schedule 08.06.2020