Я работаю в Python3.7 с несколькими потоками, взаимодействующими с очередями. Если моя программа прерывается или обнаруживает непредвиденную ошибку, я бы хотел, чтобы она полностью очистилась и корректно завершилась. Однако я заметил, что в некоторых случаях при прерывании неблокирующей функции Queue.get() (например, с помощью SIGINT) Queue никогда не освобождает свою блокировку, поэтому моя попытка очистить любой соответствующий поток, связанный с этой очередью, остановится навсегда. Это требует, чтобы я вручную отправлял больше SIGINT, что нежелательно для неконтролируемого выполнения.
Это поведение можно увидеть в следующем коде. Нажатие ctrl-C обычно не приводит к немедленному выходу из программы. Вместо этого он зависает. Окончательная трассировка показывает, что она застряла в ожидании получения блокировки.
import threading
import queue
def test_function(test_queue, thread_terminator):
while True:
try:
test_queue.put_nowait("test")
except Full:
pass
if thread_terminator.is_set():
return
if __name__ == '__main__':
test_queue = queue.Queue()
thread_terminator = threading.Event()
test_thread = threading.Thread(target=test_function, args=(test_queue, thread_terminator))
test_thread.start()
while True:
try:
test_queue.get_nowait()
except queue.Empty:
pass
except:
thread_terminator.set()
test_thread.join()
raise
Traceback (most recent call last):
File "/usr/local/lib/python3.7/threading.py", line 1281, in _shutdown
t.join()
File "/usr/local/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/usr/local/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
Почему это происходит? Очереди Python полностью потокобезопасны, верно? И библиотека очередей здесь https://github.com/python/cpython/blob/master/Lib/queue.py, кажется, получает свою блокировку через диспетчеры контекста, поэтому мне кажется, что блокировка должна быть снята в случае исключения.
Я написал некоторый код для ручного снятия блокировки и, похоже, решил проблему. Теперь при выходе с помощью SIGINT я больше не сталкиваюсь с вышеупомянутой проблемой.
import threading
import queue
def test_function(test_queue, thread_terminator):
while True:
try:
test_queue.put_nowait("test")
except queue.Full:
pass
if thread_terminator.is_set():
return
if __name__ == '__main__':
test_queue = queue.Queue()
thread_terminator = threading.Event()
test_thread = threading.Thread(target=test_function, args=(test_queue, thread_terminator))
test_thread.start()
while True:
try:
test_queue.get_nowait()
except queue.Empty:
pass
except:
thread_terminator.set()
try:
print(f"releasing lock")
test_queue.mutex.release()
print(f"lock released")
except RuntimeError:
print("lock already released")
test_thread.join()
raise
Но этот отрывочный обходной путь не кажется мне идеальным. Что, если другой поток легитимно получил блокировку очереди, а этот код все испортил? Я лично еще не сталкивался с таким сценарием, но обеспокоен.
Как я могу безопасно предотвратить эти невыпущенные блокировки и предотвратить зависание потоков?