Скрипт зависает при выходе при использовании atexit для завершения потоков

Я играю с потоками на python 3.7.4 и хочу использовать atexit для регистрации функции очистки, которая (чисто) завершит потоки.

Например:

# example.py
import threading
import queue
import atexit
import sys

Terminate = object()

class Worker(threading.Thread):
    def __init__(self):
        super().__init__()
        self.queue = queue.Queue()

    def send_message(self, m):
        self.queue.put_nowait(m)

    def run(self):
        while True:
            m = self.queue.get()
            if m is Terminate:
                break
            else:
                print("Received message: ", m)


def shutdown_threads(threads):
    for t in threads:
        print(f"Terminating thread {t}")
        t.send_message(Terminate)
    for t in threads:
        print(f"Joining on thread {t}")
        t.join()
    else:
        print("All threads terminated")

if __name__ == "__main__":
    threads = [
        Worker()
        for _ in range(5)
    ]
    atexit.register(shutdown_threads, threads)

    for t in threads:
        t.start()

    for t in threads:
        t.send_message("Hello")
        #t.send_message(Terminate)

    sys.exit(0)

Однако кажется, что взаимодействие с потоками и очередями в обратном вызове atexit создает взаимоблокировку с некоторой внутренней процедурой завершения работы:

$ python example.py
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
^CException ignored in: <module 'threading' from '/usr/lib64/python3.7/threading.py'>
Traceback (most recent call last):
  File "/usr/lib64/python3.7/threading.py", line 1308, in _shutdown
    lock.acquire()
KeyboardInterrupt
Terminating thread <Worker(Thread-1, started 140612492904192)>
Terminating thread <Worker(Thread-2, started 140612484511488)>
Terminating thread <Worker(Thread-3, started 140612476118784)>
Terminating thread <Worker(Thread-4, started 140612263212800)>
Terminating thread <Worker(Thread-5, started 140612254820096)>
Joining on thread <Worker(Thread-1, stopped 140612492904192)>
Joining on thread <Worker(Thread-2, stopped 140612484511488)>
Joining on thread <Worker(Thread-3, stopped 140612476118784)>
Joining on thread <Worker(Thread-4, stopped 140612263212800)>
Joining on thread <Worker(Thread-5, stopped 140612254820096)>
All threads terminated

(KeyboardInterrupt - это я использую ctrl-c, так как процесс, кажется, зависает на неопределенный срок).

Однако, если я отправлю сообщение Terminate перед выходом (раскомментируйте строку после t.send_message("Hello")), программа не зависнет и корректно завершится:

$ python example.py
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
Received message:  Hello
Terminating thread <Worker(Thread-1, stopped 140516051592960)>
Terminating thread <Worker(Thread-2, stopped 140516043200256)>
Terminating thread <Worker(Thread-3, stopped 140515961992960)>
Terminating thread <Worker(Thread-4, stopped 140515953600256)>
Terminating thread <Worker(Thread-5, stopped 140515945207552)>
Joining on thread <Worker(Thread-1, stopped 140516051592960)>
Joining on thread <Worker(Thread-2, stopped 140516043200256)>
Joining on thread <Worker(Thread-3, stopped 140515961992960)>
Joining on thread <Worker(Thread-4, stopped 140515953600256)>
Joining on thread <Worker(Thread-5, stopped 140515945207552)>
All threads terminated

Возникает вопрос: когда выполняется эта процедура threading._shutdown относительно обработчиков atexit? Имеет ли смысл взаимодействовать с потоками в обработчиках atexit?


person Charles Langlois    schedule 18.11.2019    source источник
comment
Почему не вы хотите сделать #t.send_message(Terminate)?   -  person stovfl    schedule 20.11.2019
comment
Очевидно, интерпретатор не вызывает обработчики atexit до тех пор, пока не завершатся все потоки, не являющиеся демонами, что подозрительно похоже на ошибку, исправленную в Python 2.6.5 (см. - stackoverflow.com/questions/3713360/ и bugs.python.org/issue1722344). Обходной путь может состоять в том, чтобы обернуть основной код в try / finally и вручную вызвать shutdown_threads(threads) самостоятельно.   -  person martineau    schedule 20.11.2019


Ответы (2)


Вы можете использовать один поток демона, чтобы попросить потоки, не являющиеся демонами, изящно очиститься. В качестве примера, когда это необходимо, если вы используете стороннюю библиотеку, которая запускает поток, не являющийся демоном, вам придется либо изменить эту библиотеку, либо сделать что-то вроде:

import threading

def monitor_thread():
    main_thread = threading.main_thread()
    main_thread.join()
    send_signal_to_non_daemon_thread_to_gracefully_shutdown()


monitor = threading.Thread(target=monitor_thread)
monitor.daemon = True
monitor.start()

start_non_daemon_thread()

Чтобы поместить это в контекст исходного кода плаката (обратите внимание, что нам не нужна функция atexit, поскольку она не будет вызываться, пока все потоки, не являющиеся демонами, не будут остановлены):

if __name__ == "__main__":
    threads = [
        Worker()
        for _ in range(5)
    ]
    
    for t in threads:
        t.start()

    for t in threads:
        t.send_message("Hello")
        #t.send_message(Terminate)

    def monitor_thread():
        main_thread = threading.main_thread()
        main_thread.join()
        shutdown_threads(threads)

    monitor = threading.Thread(target=monitor_thread)
    monitor.daemon = True
    monitor.start()
person garlon4    schedule 24.07.2020
comment
Это сработало, как и ожидалось, но я заметил, что это также работает, если monitor_thread не является потоком демона. Мое объяснение состоит в том, что он уже ожидает main_thread.join() и поэтому проснется, когда main_thread выйдет. В документах говорится, что потоки демона резко останавливаются при завершении работы, что заставляет меня думать, что здесь мы действительно можем захотеть, чтобы monitor не был потоком демона. - person lekv; 27.12.2020

atexit.register(func) регистрирует func как функцию, которая будет выполняться при завершении.

После выполнения последней строки кода (это sys.exit(0) в приведенном выше примере) в основном потоке, threading._shutdown был вызван (интерпретатором) для ожидания завершения всех потоков, не являющихся демонами (воркеры, созданные в приведенном выше примере).

Вся программа Python завершает работу, когда не остается ни одного активного потока, отличного от демона.

Таким образом, после нажатия CTRL+C основной поток завершался сигналом SIGINT, а затем интерпретатор вызывал зарегистрированные функции atexit.

Между прочим, если вы передадите daemon=True в Thread.__init__, программа будет работать без вмешательства человека.

person Jacky1205    schedule 19.11.2019
comment
Да, но я хочу, чтобы потоки завершались изящно, давая им возможность выполнить код очистки. Для этого и предназначен мой обработчик atexit. - person Charles Langlois; 19.11.2019
comment
Как сказано выше, вы можете добиться этого, сделав Worker потоком демона (super().__init__(daemon=True). - person Jacky1205; 20.11.2019
comment
Насколько я понимаю, потоки демона не имеют возможности изящно обработать свое завершение, например. очистить любые ресурсы, которые они могут удерживать. Они просто зверски убиваются при выходе из основного потока и вообще не рассматриваются в процессе завершения работы во время выполнения. См. здесь потенциальные проблемы с их использованием, например: joeshaw.org/python -daemon-threads-считается-вредным - person Charles Langlois; 20.11.2019
comment
atexit является исключением, которое позволяет нам выполнить некоторые действия по очистке до того, как интерпретатор Python выполнит настоящую финализацию. › На данный момент интерпретатор все еще не поврежден (github.com /python/cpython/blob/мастер/Python/) - person Jacky1205; 21.11.2019
comment
В качестве доказательства вы можете добавить ведение журнала после того, как Worker получил Terminate из очереди. Вы увидите, что рабочие все еще живы во время выполнения atexit зарегистрированных функций. - person Jacky1205; 21.11.2019
comment
Хорошо, я вижу, это имеет смысл. Либо я использую потоки демона и atexit, либо мне нужно выполнить отключение вручную перед выходом. Спасибо! - person Charles Langlois; 21.11.2019