Ожидание завершения подзадачи по событию завершения внешней задачи (асинхронная версия add_done_callback)

Допустим, у нас есть некоторая задача (подзадача), которая должна быть завершена после выполнения внешней задачи. У нас нет контроля над внешней задачей: мы не знаем, когда она будет завершена (это может произойти до выполнения подзадачи), мы не можем дождаться подзадачи внутри.

В этом фрагменте мы получим предупреждение, потому что внешняя задача завершилась раньше подзадачи:

import asyncio


def create_sub_task():
    sub_task = asyncio.ensure_future(sub())
    # We want this sub_task to be finished when outer task done


async def sub():
    await asyncio.sleep(2)
    print('sub done')


async def main():  # main is outer task for sub_task
    create_sub_task()
    await asyncio.sleep(1)
    print('outer done')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

add_done_callback выглядит как способ поймать момент завершения внешней задачи, но здесь мы не можем ждать подзадачи: эта функция синхронна.

Я нашел способ использовать частную функцию _run_once цикла событий для синхронного ожидания завершения задачи внутри обратного вызова:

import asyncio
from functools import partial


def create_sub_task():
    sub_task = asyncio.ensure_future(sub())

    # Callback to wait for sub_task
    outer_task = asyncio.Task.current_task()
    outer_task.add_done_callback(partial(_stop_task, sub_task))


async def sub():
    await asyncio.sleep(2)
    print('sub done')


def _stop_task(sub_task, task):
    # Ugly way to wait sub_task finished:
    loop = asyncio.get_event_loop()
    while not sub_task.done():
        loop._run_once()


async def main():  # main is outer task for sub_task
    create_sub_task()
    await asyncio.sleep(1)
    print('outer done')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Это работает, но это уродливый способ со многими возможными проблемами.

Есть идеи, как лучше решить задачу?


person Mikhail Gerasimov    schedule 08.06.2016    source источник


Ответы (1)


На мой взгляд, без внутренностей это не решить. Лично я бы asyncio.gather выполнил внешние и подзадачи в одном будущем, а затем переписал бы обратные вызовы.

К сожалению, список обратных вызовов Future не отображается в общедоступном интерфейсе (я использую _callbacks):

import asyncio

def create_sub_task():
    sub_task = asyncio.ensure_future(sub())
    outer_task = asyncio.Task.current_task()

    multi_fut = asyncio.gather(sub_task, outer_task)
    for cb in outer_task._callbacks:
        multi_fut.add_done_callback(cb)
        outer_task.remove_done_callback(cb)

async def sub():
    await asyncio.sleep(2)
    print('sub done')


async def main():  # main is outer task for sub_task
    create_sub_task()
    await asyncio.sleep(1)
    print('outer done')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Я предполагаю, что вы не хотите или не можете изменить поток, но я призываю вас переосмыслить. Может быть, опубликовать контекст - происхождение ограничений.

person kwarunek    schedule 08.06.2016
comment
Спасибо за ответ. Первоначально я столкнулся с проблемой, пытаясь реализовать своего рода асинхронный генератор здесь stackoverflow.com/a/37572657/1113207 (это ранняя версия кода) Подзадача - это тело генератора. Я не знаю, завершится ли задача, но мне нужно остановить ее, когда завершится внешняя задача (см. функцию _cleanup). - person Mikhail Gerasimov; 09.06.2016