Часть 2: Как реализовать многопоточность или многопроцессорность в Python

Я хотел бы публиковать больше ценных статей на этом канале, ваша поддержка имеет решающее значение для этой экосистемы. Пожалуйста, следуйте за мной и хлопайте, если вам нравится то, что я пишу. Спасибо.

Что такое нити?

Является частью процесса, выполняющегося в памяти, и может быть несколькими потоками внутри одного процесса. Каждый поток может иметь собственное пространство стека и регистры. Потоки обмениваются данными друг с другом.

Однопоточный процесс

На приведенной выше диаграмме «Однопоточный процесс» показаны код, данные и файлы в одном процессе.

Многопоточный процесс

Каждый из потоков имеет свой собственный код, данные, файлы, стек и регистры и может обмениваться информацией с другими потоками в рамках одного процесса. Таким образом, обмен информацией происходит быстрее, чем программы с несколькими процессами.

Потоковые модули в Python

Модуль threading в Python позволяет нам управлять потоками в Python. Он содержит функции для выполнения таких операций, как порождение потоков, синхронизация потоков и т. д.

Позвольте мне показать код, чтобы продемонстрировать, как он работает.

Обход без использования декоратора:

Обработка потоков с помощью декоратора:

Вы заметили различия в коде?

Декораторы могут уменьшить избыточность кода. Рекомендуется делать код проще.

Вывод:

Условие гонки

Состояние гонки возникает, когда два или более потока могут получить доступ к общим данным и попытаться изменить их одновременно. Поскольку алгоритм планирования потоков может переключаться между потоками в любое время, вы не знаете порядок, в котором потоки будут пытаться получить доступ к общим данным. Следовательно, результат изменения данных зависит от алгоритма планирования потоков, т. е. оба потока «соревнуются» в доступе/изменении данных.

Проблемы часто возникают, когда один поток выполняет действие «проверить, затем выполнить» (например, «проверить», если значение равно X, затем «действовать», чтобы сделать что-то, что зависит от значения, равного X), а другой поток делает что-то со значением в между «проверкой» и «актом». Например:

if (x == 5) // The "Check"
{
   y = x * 2; // The "Act"

   // If another thread changed x in between "if (x == 5)" and "y = x * 2" above,
   // y will not be equal to 10.
}

Состояние гонки возникает, когда два или более потока могут получить доступ к общим данным и попытаться изменить их одновременно. Поскольку алгоритм планирования потоков может переключаться между потоками в любое время, вы не знаете порядок, в котором потоки будут пытаться получить доступ к общим данным. Следовательно, результат изменения данных зависит от алгоритма планирования потоков, т. е. оба потока «соревнуются» в доступе/изменении данных.

Проблемы часто возникают, когда один поток выполняет действие «проверить, затем выполнить» (например, «проверить», если значение равно X, затем «действовать», чтобы сделать что-то, что зависит от значения, равного X), а другой поток делает что-то со значением в между «проверкой» и «актом». Например:

if (x == 5) // The "Check"
{
   y = x * 2; // The "Act"
   // If another thread changed x in between "if (x == 5)" and "y = x * 2" above,
   // y will not be equal to 10.
}

Дело в том, что у может быть 10 или что угодно, в зависимости от того, изменил ли другой поток х между проверкой и действием. У вас нет реального способа узнать.

Блокировка потока

Чтобы предотвратить возникновение условий гонки, вы обычно блокируете общие данные, чтобы гарантировать, что только один поток может получить доступ к данным за раз. Это будет означать что-то вроде этого:

// Obtain lock for x
if (x == 5)
{
   y = x * 2; // Now, nothing can change x until the lock is released. 
              // Therefore y = 10
}
// release lock for x

Тип блокировки (Python):

  1. Замок
  2. Блокировка повторного входа
  3. Глобальная блокировка интерпретатора

Обычный замок

Модуль threading в Python содержит блокировку для реализации механизма синхронизации. Он содержит следующие методы:

  1. приобрести: по существу блокирует замок

2. release: снимает блокировку

from threading import Thread, Lock
# Not a good practise to use global variables
my_global_string = "Hello World"
def add_prefix(prefix_to_add):
    # Adds suffix to global string
    
    global my_global_string
    
    # Acquire the lock over the data shared between threads
    thread_lock.acquire()
    
    # Perform operation on shared data
    my_global_string = prefix_to_add + " " + my_global_string
    
    # Release the lock
    thread_lock.release()
def add_suffix(suffix_to_add):
    # Adds suffix to global string
    
    global my_global_string
    
    # Acquire the lock over the data shared between threads
    thread_lock.acquire()
    
    # Perform operation on shared data
    my_global_string = my_global_string + " " + suffix_to_add 
    
    # Release the lock
    thread_lock.release()
    
def do_threading():
    
    thread_prefix = Thread(target=add_prefix, args=("YOLO",))
    thread_suffix = Thread(target=add_suffix, args=("BYE!!",))
    
    thread_prefix.start()
    thread_suffix.start()
    
    thread_prefix.join()
    thread_suffix.join()
    
    global my_global_string
    print("Final string is {}".format(my_global_string))

RБлокировка

Зачем использовать RLock вместо обычной блокировки?

Проблема с обычными блокировками заключается в том, что они не знают, какой поток удерживает блокировку в данный момент.

Таким образом, в случае, если, скажем, поток x удерживает блокировку и по какой-то причине пытается снова получить блокировку, он будет заблокирован (даже если сам поток X удерживает указанную блокировку.

Использование RLock (Re-entrant Lock) может решить эту проблему.

from threading import RLock
my_re_entrant_lock = RLock()
my_re_entrant_lock.acquire()
my_global_string = "yolo swag"
# If this was a conventional Lock, then this would a blocking call for the thread
# Even though the same thread is trying to acess it again.
my_re_entrant_lock.acquire()   
my_global_string += " ok bye !"
my_re_entrant_lock.release()
my_re_entrant_lock.release()

Глобальная блокировка интерпретатора

Потоки Python в основном управляются основной ОС, а также, в зависимости от системы, они могут быть потоками POSIX или потоками на основе Windows.

GIL гарантирует, что одновременно работает только один поток в интерпретаторе.

По сути, это сделано для упрощения управления памятью между потоками.

Как GIL влияет на потоки?

Поскольку GIL не допускает одновременное использование нескольких потоков, он ограничивает параллелизм, который может быть достигнут с помощью потоков.

Когда поток выполняется в интерпретаторе, он удерживает GIL, и другие потоки не могут выполняться в это время.

Что такое многопроцессорность?

В многопроцессорной обработке мы порождаем несколько процессов для параллельного выполнения.

Аналогичен многопоточности, за исключением того, что вместо потоков создаются процессы.

Многопроцессорность в python не страдает от ограничений GIL, связанных с многопоточностью.

Каждый процесс имеет свое собственное пространство памяти, в отличие от потоков, которые могут совместно использовать одно и то же пространство внутри процесса.

Давайте изменим предыдущий код для реализации многопроцессорности.

Код:

from multiprocessing import Process
def func_simple(length):
    sum_f1 = 0
    for x in range(0, length):
        sum_f1 += x
    print("Normal sum is {}".format(sum_f1))
    
def func_square(length):
    sum_f2 = 0
    for x in range(0, length):
        sum_f2 += x * x
    print("Square sum is {}".format(sum_f2))
def func_cubes(length):
    sum_f3 = 0
    for x in range(0, length):
        sum_f3 += x * x * x
    print("Cube sum is {}".format(sum_f3))
process_simple = Process(target=func_simple, args=(length,))
process_square = Process(target=func_square, args=(length,))
process_cube = Process(target=func_cube, args=(length,))

process_simple.start()
process_square.start()
process_cube.start()
    
process_simple.join()
process_square.join()
process_cube.join()

Расширенные методы:

Класс пула в многопроцессорной обработке действует как интерфейс для управления несколькими процессами.

Карта (бассейн):

start = time.time()
pool = Pool(processes=4)
input_list = range(0, 50000000)
result = pool.map(some_function, input_list) #This issues a blocking call
end = time.time()
print("Time taken is {} seconds".format(end - start))

Асинхронная карта (пул): быстрее

start = time.time()
pool = Pool(processes=4)
input_list = range(0, 50000000)
result = pool.map_async(some_function, input_list) #This issues a non blocking call
end = time.time()
print("Time taken is {} seconds".format(end - start))

Связь:



Дополнительные материалы на PlainEnglish.io. Подпишитесь на нашу бесплатную еженедельную рассылку новостей. Подпишитесь на нас в Twitter и LinkedIn. Присоединяйтесь к нашему сообществу Discord.