Асинхронная задача Python3 была уничтожена, но ожидает выполнения с определенным условием

Вот упрощенный код, который использует сопрограмму python3 и устанавливает обработчик сигналов SIGING и SIGTERM для правильной остановки задания:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import asyncio
import signal
import sys


def my_handler(signum, frame):
    print('Stopping')
    asyncio.get_event_loop().stop()
    # Do some staff
    sys.exit()


@asyncio.coroutine
def prob_ip(ip_addr):

    print('Ping ip:%s' % ip_addr)
    proc = yield from asyncio.create_subprocess_exec('ping', '-c', '3', ip_addr)
    ret_code = yield from proc.wait()
    if ret_code != 0:
        print("ip:%s doesn't responding" % ip_addr)
        # Do some staff
        yield from asyncio.sleep(2)
        # Do more staff
        yield from asyncio.sleep(16)


@asyncio.coroutine
def run_probing():

    print('Start probing')
    # Do some staff
    yield from asyncio.sleep(1)

    while True:
        yield from asyncio.wait([prob_ip('192.168.1.3'), prob_ip('192.168.1.2')])
        yield from asyncio.sleep(60)


def main():
    parser = argparse.ArgumentParser()
    parser.description = "Probing ip."
    parser.parse_args()

    signal.signal(signal.SIGINT, my_handler)
    signal.signal(signal.SIGTERM, my_handler)

    asyncio.get_event_loop().run_until_complete(run_probing())


if __name__ == '__main__':
    main()

Когда я запускаю его через:

python3 test1.py

Он останавливается по Ctrl-C без каких-либо предупреждений. Но когда я запускаю его через:

python3 -m test1

Он печатает предупреждение по Ctrl-C:

$ python3 -m test1 
Start probing
Ping ip:192.168.1.2
Ping ip:192.168.1.3
PING 192.168.1.2 (192.168.1.2): 56 data bytes
PING 192.168.1.3 (192.168.1.3): 56 data bytes
^C--- 192.168.1.2 ping statistics ---
--- 192.168.1.3 ping statistics ---
1 packets transmitted, 0 packets received, 100% packet loss
1 packets transmitted, 0 packets received, 100% packet loss
Stopping
Task was destroyed but it is pending!
task: <Task pending coro=<prob_ip() running at /tmp/test1.py:22> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:394]>
Task was destroyed but it is pending!
task: <Task pending coro=<prob_ip() running at /tmp/test1.py:22> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:394]>

Такое же предупреждение я получаю, если устанавливаю этот скрипт через:

from setuptools import setup

setup(name='some_scripts',
      version='1.0.0.0',
      author='Some Team',
      author_email='[email protected]',
      url='https://www.todo.ru',
      description='Some scripts',
      packages=['my_package'],
      entry_points={'console_scripts': [
          'test1=my_package.test1:main',
      ]},
      )

Моя версия Python — «3.4.2».


person willir    schedule 03.11.2015    source источник
comment
Вы можете использовать loop.add_signal_handler. , хотя это не связано с вашей проблемой.   -  person Vincent    schedule 04.11.2015
comment
Да, спасибо, в моем случае этот способ выглядит лучше. Но это не помогает. Я все еще получаю те же предупреждения.   -  person willir    schedule 05.11.2015


Ответы (2)


В порядке. Я думаю, что понял, как мне остановить все задачи.

  1. Во-первых, насколько я понимаю. BaseEventLoop.stop() предназначен только для остановки BaseEventLoop.run_forever(). Поэтому следует отменить все задачи через Future.cancel. Чтобы получить все задачи, вы можете использовать Task.all_tasks статический метод.
  2. После отмены всех задач будет возбуждено исключение asyncio.CancelledError из run_until_complete. Поэтому его следует поймать, если вы не хотите печатать его в stderr.
  3. А также в некоторых случаях я получаю эту ошибку: TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object. Я нашел несколько тем об этой ошибке:

    Все они говорят, что это можно исправить, закрыв цикл перед выходом из приложения.

Итак, мы получаем этот код:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import signal


def my_handler():
    print('Stopping')
    for task in asyncio.Task.all_tasks():
        task.cancel()


@asyncio.coroutine
def do_some(some_args):
    while True:
        print("Do staff with %s" % some_args)
        yield from asyncio.sleep(2)


def main():
    loop = asyncio.get_event_loop()

    loop.add_signal_handler(signal.SIGINT, my_handler)

    try:
        loop.run_until_complete(asyncio.wait([do_some(1), do_some(2)]))
    except asyncio.CancelledError:
        print('Tasks has been canceled')
    finally:
        loop.close()


if __name__ == '__main__':
    main()

Он также работает с signal.signal. Но, как заметил Винсент, loop.add_signal_handler выглядит лучше в таком случае.

Но я все еще не уверен, что это лучший способ остановить все задачи.

person willir    schedule 08.11.2015
comment
(1) в отличие от фьючерсов, задачи могут игнорировать .cancel() (поймать CancelledError) (2) если задачи необходимо очистить, вы можете loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks())) перед loop.close()< /а> - person jfs; 08.11.2015
comment
Если вы не хотите, чтобы обработчик сигналов отменял все запущенные задачи, посмотрите мой ответ (отредактированный). - person Vincent; 08.11.2015
comment
@ J.F.Sebastian Да, спасибо, мне нужно дождаться незавершенных задач перед выходом. Но (я не знаю почему) asyncio.gather не работает в этом случае (в списке в python3.5.0). asyncio.wait работает, поэтому мы получаем такой код: . Изменить: также я не знаю, почему мне не нужно ловить asyncio.CancelledError из этого (фактически второго) loop.run_until_complete. - person willir; 09.11.2015
comment
@willir: (1) я думаю, у вас неправильные аргументы (asyncio.wait() и asynio.gather() имеют разный интерфейс, например, обратите внимание на * в вызове выше). (2) если вы не понимаете, как/где/когда может возникнуть CancelledError, когда вы задаете отдельный вопрос, посвященный этому. - person jfs; 09.11.2015

Используйте asyncio.gather вместо asyncio.wait:

Отмена: если внешнее Future отменяется, все дочерние элементы (которые еще не завершены) также отменяются.

Пример:

def handler(future, loop):
    future.cancel()
    loop.stop()

@asyncio.coroutine
def do_some(arg):
    while True:
        print("Do stuff with %s" % arg)
        yield from asyncio.sleep(2)

loop = asyncio.get_event_loop()
future = asyncio.gather(do_some(1), do_some(2))
loop.add_signal_handler(signal.SIGINT, handler, future, loop)
loop.run_forever()
person Vincent    schedule 07.11.2015