Параллельный будущий опрос серии блокирующих вызовов

Я пытаюсь создать механизм опроса для длительной задачи в Python. Для этого я использую параллельное Future и опрос с помощью .done(). Задача состоит из множества итераций, которые сами по себе блокируются, и я завернул их в асинхронную функцию. У меня нет доступа к коду блокирующих функций, так как я звоню стороннему ПО. Это минимальный пример моего текущего подхода:

import asyncio
import time

async def blocking_iteration():
    time.sleep(1)

async def long_running():
    for i in range(5):
        print(f"sleeping {i}")
        await blocking_iteration()

async def poll_run():
    future = asyncio.ensure_future(long_running())
    while not future.done():
        print("before polling")
        await asyncio.sleep(0.05)
        print("polling")
    future.result()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(poll_run())
    loop.close()

Результатом этого является:

before polling
sleeping 0
sleeping 1
sleeping 2
sleeping 3
sleeping 4
polling

Исходя из моего текущего понимания механизма asyncio в Python, я ожидал, что цикл разблокируется после первого сна, вернет управление циклу, который вернется к оператору poll_run await и запустит вторую итерацию функции long_running только после того, как последующий опрос. Таким образом, желаемый результат выглядит примерно так:

before polling
sleeping 0
polling
before polling
sleeping 1
polling
before polling
sleeping 2
polling
before polling
sleeping 3
polling
before polling
sleeping 4
polling

Можно ли этого как-то добиться при нынешнем подходе или можно по-другому?

ИЗМЕНИТЬ

Благодаря @drjackild удалось решить эту проблему, изменив

async def blocking_iteration():
    time.sleep(1)

в

def blocking():
    time.sleep(1)

async def blocking_iteration():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, blocking)

person user2416984    schedule 26.03.2020    source источник


Ответы (1)


time является синхронной библиотекой и блокирует весь основной поток при выполнении. Если в вашей программе есть такие блокирующие вызовы, вы можете избежать блокировки с помощью исполнителей пула потоков или процессов (вы можете прочитать об этом здесь). Или измените blocking_iteration, чтобы использовать asyncio.sleep вместо time.sleep.

УПД. Просто чтобы было понятно, вот неблокирующая версия, в которой используется loop.run_in_executor с исполнителем по умолчанию. Пожалуйста, обратите внимание, что blocking_iteration теперь без async

import asyncio
import concurrent.futures
import time

def blocking_iteration():
    time.sleep(1)

async def long_running():
    loop = asyncio.get_event_loop()
    for i in range(5):
        print(f"sleeping {i}")
        await loop.run_in_executor(None, blocking_iteration)

async def poll_run():
    task = asyncio.create_task(long_running())
    while not task.done():
        print("before polling")
        await asyncio.sleep(0.05)
        print("polling")
    print(task.result())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(poll_run())
    loop.close()
person drjackild    schedule 26.03.2020