Блокировка потока Python никогда не освобождается, если Queue.get() прерывается

Я работаю в Python3.7 с несколькими потоками, взаимодействующими с очередями. Если моя программа прерывается или обнаруживает непредвиденную ошибку, я бы хотел, чтобы она полностью очистилась и корректно завершилась. Однако я заметил, что в некоторых случаях при прерывании неблокирующей функции Queue.get() (например, с помощью SIGINT) Queue никогда не освобождает свою блокировку, поэтому моя попытка очистить любой соответствующий поток, связанный с этой очередью, остановится навсегда. Это требует, чтобы я вручную отправлял больше SIGINT, что нежелательно для неконтролируемого выполнения.

Это поведение можно увидеть в следующем коде. Нажатие ctrl-C обычно не приводит к немедленному выходу из программы. Вместо этого он зависает. Окончательная трассировка показывает, что она застряла в ожидании получения блокировки.

import threading
import queue

def test_function(test_queue, thread_terminator):
    while True:
        try:
            test_queue.put_nowait("test")
        except Full:
            pass

        if thread_terminator.is_set():
            return

if __name__ == '__main__':
    test_queue = queue.Queue()
    thread_terminator = threading.Event()
    test_thread = threading.Thread(target=test_function, args=(test_queue, thread_terminator))
    test_thread.start()

    while True:
        try:
            test_queue.get_nowait()
        except queue.Empty:
            pass
        except:
            thread_terminator.set()
            test_thread.join()
            raise
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 1281, in _shutdown
    t.join()
  File "/usr/local/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/usr/local/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):

Почему это происходит? Очереди Python полностью потокобезопасны, верно? И библиотека очередей здесь https://github.com/python/cpython/blob/master/Lib/queue.py, кажется, получает свою блокировку через диспетчеры контекста, поэтому мне кажется, что блокировка должна быть снята в случае исключения.

Я написал некоторый код для ручного снятия блокировки и, похоже, решил проблему. Теперь при выходе с помощью SIGINT я больше не сталкиваюсь с вышеупомянутой проблемой.

import threading
import queue

def test_function(test_queue, thread_terminator):
    while True:
        try:
            test_queue.put_nowait("test")
        except queue.Full:
            pass

        if thread_terminator.is_set():
            return

if __name__ == '__main__':
    test_queue = queue.Queue()
    thread_terminator = threading.Event()
    test_thread = threading.Thread(target=test_function, args=(test_queue, thread_terminator))
    test_thread.start()

    while True:
        try:
            test_queue.get_nowait()
        except queue.Empty:
            pass
        except:
            thread_terminator.set()
            try:
               print(f"releasing lock")
               test_queue.mutex.release()
               print(f"lock released")
            except RuntimeError:
                print("lock already released")
            test_thread.join()
            raise

Но этот отрывочный обходной путь не кажется мне идеальным. Что, если другой поток легитимно получил блокировку очереди, а этот код все испортил? Я лично еще не сталкивался с таким сценарием, но обеспокоен.

Как я могу безопасно предотвратить эти невыпущенные блокировки и предотвратить зависание потоков?


person Kevin Guo    schedule 03.08.2019    source источник
comment
Может быть связано с этим: stackoverflow.com/a/11436603/11877195   -  person Sahsahae    schedule 03.08.2019
comment
Спасибо! Проблема с этим сообщением заключалась в том, как сигнализировать потоку об остановке, но я уже учитываю это, используя событие, чтобы сигнализировать потоку о выходе из цикла управления. Проблема здесь заключалась в том, что, хотя у потока был сигнал к остановке, он был заморожен из-за невыпущенной блокировки очереди.   -  person Kevin Guo    schedule 05.08.2019