Часть 2: Как реализовать многопоточность или многопроцессорность в Python
Я хотел бы публиковать больше ценных статей на этом канале, ваша поддержка имеет решающее значение для этой экосистемы. Пожалуйста, следуйте за мной и хлопайте, если вам нравится то, что я пишу. Спасибо.
Что такое нити?
Является частью процесса, выполняющегося в памяти, и может быть несколькими потоками внутри одного процесса. Каждый поток может иметь собственное пространство стека и регистры. Потоки обмениваются данными друг с другом.
Однопоточный процесс
На приведенной выше диаграмме «Однопоточный процесс» показаны код, данные и файлы в одном процессе.
Многопоточный процесс
Каждый из потоков имеет свой собственный код, данные, файлы, стек и регистры и может обмениваться информацией с другими потоками в рамках одного процесса. Таким образом, обмен информацией происходит быстрее, чем программы с несколькими процессами.
Потоковые модули в Python
Модуль threading в Python позволяет нам управлять потоками в Python. Он содержит функции для выполнения таких операций, как порождение потоков, синхронизация потоков и т. д.
Позвольте мне показать код, чтобы продемонстрировать, как он работает.
Обход без использования декоратора:
Обработка потоков с помощью декоратора:
Вы заметили различия в коде?
Декораторы могут уменьшить избыточность кода. Рекомендуется делать код проще.
Вывод:
Условие гонки
Состояние гонки возникает, когда два или более потока могут получить доступ к общим данным и попытаться изменить их одновременно. Поскольку алгоритм планирования потоков может переключаться между потоками в любое время, вы не знаете порядок, в котором потоки будут пытаться получить доступ к общим данным. Следовательно, результат изменения данных зависит от алгоритма планирования потоков, т. е. оба потока «соревнуются» в доступе/изменении данных.
Проблемы часто возникают, когда один поток выполняет действие «проверить, затем выполнить» (например, «проверить», если значение равно X, затем «действовать», чтобы сделать что-то, что зависит от значения, равного X), а другой поток делает что-то со значением в между «проверкой» и «актом». Например:
if (x == 5) // The "Check"
{
y = x * 2; // The "Act"
// If another thread changed x in between "if (x == 5)" and "y = x * 2" above,
// y will not be equal to 10.
}
Состояние гонки возникает, когда два или более потока могут получить доступ к общим данным и попытаться изменить их одновременно. Поскольку алгоритм планирования потоков может переключаться между потоками в любое время, вы не знаете порядок, в котором потоки будут пытаться получить доступ к общим данным. Следовательно, результат изменения данных зависит от алгоритма планирования потоков, т. е. оба потока «соревнуются» в доступе/изменении данных.
Проблемы часто возникают, когда один поток выполняет действие «проверить, затем выполнить» (например, «проверить», если значение равно X, затем «действовать», чтобы сделать что-то, что зависит от значения, равного X), а другой поток делает что-то со значением в между «проверкой» и «актом». Например:
if (x == 5) // The "Check" { y = x * 2; // The "Act"
// If another thread changed x in between "if (x == 5)" and "y = x * 2" above, // y will not be equal to 10. }
Дело в том, что у может быть 10 или что угодно, в зависимости от того, изменил ли другой поток х между проверкой и действием. У вас нет реального способа узнать.
Блокировка потока
Чтобы предотвратить возникновение условий гонки, вы обычно блокируете общие данные, чтобы гарантировать, что только один поток может получить доступ к данным за раз. Это будет означать что-то вроде этого:
// Obtain lock for x
if (x == 5)
{
y = x * 2; // Now, nothing can change x until the lock is released.
// Therefore y = 10
}
// release lock for x
Тип блокировки (Python):
- Замок
- Блокировка повторного входа
- Глобальная блокировка интерпретатора
Обычный замок
Модуль threading в Python содержит блокировку для реализации механизма синхронизации. Он содержит следующие методы:
- приобрести: по существу блокирует замок
2. release: снимает блокировку
from threading import Thread, Lock # Not a good practise to use global variables my_global_string = "Hello World" def add_prefix(prefix_to_add): # Adds suffix to global string global my_global_string # Acquire the lock over the data shared between threads thread_lock.acquire() # Perform operation on shared data my_global_string = prefix_to_add + " " + my_global_string # Release the lock thread_lock.release() def add_suffix(suffix_to_add): # Adds suffix to global string global my_global_string # Acquire the lock over the data shared between threads thread_lock.acquire() # Perform operation on shared data my_global_string = my_global_string + " " + suffix_to_add # Release the lock thread_lock.release() def do_threading(): thread_prefix = Thread(target=add_prefix, args=("YOLO",)) thread_suffix = Thread(target=add_suffix, args=("BYE!!",)) thread_prefix.start() thread_suffix.start() thread_prefix.join() thread_suffix.join() global my_global_string print("Final string is {}".format(my_global_string))
RБлокировка
Зачем использовать RLock вместо обычной блокировки?
Проблема с обычными блокировками заключается в том, что они не знают, какой поток удерживает блокировку в данный момент.
Таким образом, в случае, если, скажем, поток x удерживает блокировку и по какой-то причине пытается снова получить блокировку, он будет заблокирован (даже если сам поток X удерживает указанную блокировку.
Использование RLock (Re-entrant Lock) может решить эту проблему.
from threading import RLock my_re_entrant_lock = RLock() my_re_entrant_lock.acquire() my_global_string = "yolo swag" # If this was a conventional Lock, then this would a blocking call for the thread # Even though the same thread is trying to acess it again. my_re_entrant_lock.acquire() my_global_string += " ok bye !" my_re_entrant_lock.release() my_re_entrant_lock.release()
Глобальная блокировка интерпретатора
Потоки Python в основном управляются основной ОС, а также, в зависимости от системы, они могут быть потоками POSIX или потоками на основе Windows.
GIL гарантирует, что одновременно работает только один поток в интерпретаторе.
По сути, это сделано для упрощения управления памятью между потоками.
Как GIL влияет на потоки?
Поскольку GIL не допускает одновременное использование нескольких потоков, он ограничивает параллелизм, который может быть достигнут с помощью потоков.
Когда поток выполняется в интерпретаторе, он удерживает GIL, и другие потоки не могут выполняться в это время.
Что такое многопроцессорность?
В многопроцессорной обработке мы порождаем несколько процессов для параллельного выполнения.
Аналогичен многопоточности, за исключением того, что вместо потоков создаются процессы.
Многопроцессорность в python не страдает от ограничений GIL, связанных с многопоточностью.
Каждый процесс имеет свое собственное пространство памяти, в отличие от потоков, которые могут совместно использовать одно и то же пространство внутри процесса.
Давайте изменим предыдущий код для реализации многопроцессорности.
Код:
from multiprocessing import Process def func_simple(length): sum_f1 = 0 for x in range(0, length): sum_f1 += x print("Normal sum is {}".format(sum_f1)) def func_square(length): sum_f2 = 0 for x in range(0, length): sum_f2 += x * x print("Square sum is {}".format(sum_f2)) def func_cubes(length): sum_f3 = 0 for x in range(0, length): sum_f3 += x * x * x print("Cube sum is {}".format(sum_f3)) process_simple = Process(target=func_simple, args=(length,)) process_square = Process(target=func_square, args=(length,)) process_cube = Process(target=func_cube, args=(length,)) process_simple.start() process_square.start() process_cube.start() process_simple.join() process_square.join() process_cube.join()
Расширенные методы:
Класс пула в многопроцессорной обработке действует как интерфейс для управления несколькими процессами.
Карта (бассейн):
start = time.time() pool = Pool(processes=4) input_list = range(0, 50000000) result = pool.map(some_function, input_list) #This issues a blocking call end = time.time() print("Time taken is {} seconds".format(end - start))
Асинхронная карта (пул): быстрее
start = time.time() pool = Pool(processes=4) input_list = range(0, 50000000) result = pool.map_async(some_function, input_list) #This issues a non blocking call end = time.time() print("Time taken is {} seconds".format(end - start))
Связь:
Дополнительные материалы на PlainEnglish.io. Подпишитесь на нашу бесплатную еженедельную рассылку новостей. Подпишитесь на нас в Twitter и LinkedIn. Присоединяйтесь к нашему сообществу Discord.