Многопоточность в Python: модуль threading

Современное программное обеспечение проектируется так, что его функции и задачи могут выполняться параллельно. Python предоставляет программисту мощный набор инструментов для работы с потоками в библиотеке threading.

Как работает многопоточность

Многопоточность — это выполнение программы сразу в нескольких потоках, которые выполняют её функции одновременно.

Многопоточное программирование можно спутать с мультипроцессорным. На самом деле их концепции очень похожи, но если в первом случае программа работает с потоками, то в другом — с процессами. Разница между потоками и процессами проста: потоки имеют общую память, поэтому изменения в одном потоке видны в других, а процессы используют разные области памяти.

На самом деле, если рассмотреть одноядерный процессор, операции из разных потоков не выполняются параллельно. Одно ядро может выполнить только одну операцию в единицу времени, но так как операции выполняются очень быстро, создается ощущение параллельного выполнения, псевдопараллельность. По-настоящему параллельно программы могут работать только на многоядерных процессорах, где каждое ядро может выполнять операции независимо от других.

Отличным примером использования многопоточности является программа, где отрисовка графического интерфейса и обработка ввода пользователя управляются разными потоками. Если бы обе задачи были помещены в один поток, отрисовка интерфейса прерывалась бы каждый раз, когда программа получает ввод от пользователя. Использование двух потоков позволяет сделать выполнение этих функций независимым друг от друга.

Однако при выполнении многопоточной программы на одноядерном процессоре, её производительность будет ниже, чем если бы она была написана в один поток. Это происходит потому, что на реализацию и управление потоками тратится дополнительная память.

Можно ли считать threading многопоточным?

В Python используется GIL (Global Interpreter Lock), который однопоточный. Все потоки, которые создаются с помощью threading будут работать внутри потока GIL. В связи с этим они будут обрабатываться только одним ядром. Ни о какой работе одновременно на нескольких физических ядрах процессора не может быть и речи.

А так как threading будет выполняться только на одном ядре процессора, то нету преимущества по скорости, только наоборот — threading замедлит работу.

Но без него никуда не деться, если вам нужно выполнять несколько задач одновременно:

  • Обрабатывать нажатие кнопки в графическом интерфейсе, например с помощью Tkinter. Если по нажатию кнопки надо производить много действий, которые требуют времени, то эти действия надо выполнять в другом потоке, чтобы графический интерфейс не подвис на это время. Соответственно кнопки надо блокировать, а как поток завершит вычисления — обратно разблокировать.
  • Если наша программа работает одновременно с несколькими подключенными устройствами. Они могут быть подключены к разным COM-портам.
  • Если мы загружаем файлы из сети и одновременно обрабатываем уже загруженные.
  • И так далее…
Если нам нужно, чтобы наша программа работала на нескольких физических ядрах процессора одновременно, то следует обратить внимание на другой модуль — Multiprocessing.

В чём преимущества тогда модуля Threading по сравнению с Multiprocessing? Рассмотрим их:

  • Простота использования.
  • Проще передавать данные из потока в основную программу. Вообще можно даже использовать глобальные переменные. Но при этом надо правильно проектировать программу, чтобы не было ошибок, связанных с «Состоянием гонки», которые мы рассмотрим ниже.
Таким образом, если наша программа будет запускаться на одноядерном компьютере или нагрузка на процессор будет не большой, то Threading — оптимальный выбор.

Подключение библиотеки threading

Threading – это стандартный модуль, который поставляется вместе с интерпретатором. Программисту не нужно устанавливать его, достаточно просто подключить модуль с помощью команды:

import threading

Работать с потоками можно, создавая экземпляры класса Thread. Чтобы создать отдельный, поток нужно создать экземпляр класса и применить к нему метод start(). Вот пример:

import threading
def myfunc(a, b):
    print('сумма :',a + b)
thr1 = threading.Thread(target = myfunc, args = (1, 2)).start()
print('основной поток')

основной поток
сумма : 3

Здесь мы функцию mydef запустили в отдельном потоке. В качестве аргументов функции передали числа 1 и 2.

threading.Thread()

Эта конструкция позволяет создать новый поток, создав экземпляр класса Thread. Вот как выглядят её аргументы:

Она принимает аргументы:

threading.Thread(group=None, target=None, name=None, args=(),
                 kwargs={}, *, daemon=None)

Рассмотрим их подробнее:

  • group. Имеет значение None, зарезервирована для будущего расширения при реализации класса ThreadGroup.
  • target. Это функция, которая выполняется в потоке с помощью метода run(), если передано значение None, ничего не вызывается.
  • name. Это имя потока, по умолчанию оно принимает значение «Thread-X», где X – десятичное число. Программист может задать имя вручную.
  • args. Это кортеж, в котором хранятся аргументы, передаваемые в вызываемую функцию.
  • kwargs. Это словарь, в котором хранятся аргументы, передаваемые в функцию.
  • daemon. Это параметр, который устанавливает, является ли поток демоническим. По умолчанию имеет значение None, тогда свойство daemonic наследуется от текущего потока. Программист может самостоятельно установить значение параметра.

Демоны

Демонами называют процессы, которые работают в фоновом режиме. В Python для демона есть более конкретное значение: демонический поток или поток демона. В отличие от обычных потоков поток демона автоматически завершает свою работу при закрытии программы. Иными словами, программа не будет ожидать завершения демонического потока, при её закрытии эти потоки уничтожаются, в каком бы состоянии они не находились.

Демонические потоки используют для выполнения операций, выполняемых в бесконечном цикле. В других случаях обычно используют простые потоки, которые задерживают закрытие программы, пока не завершат выполнение всех операций. Использование демонических потоков позволяет операции в фоновом режиме, которые обычно не связаны с изменением и сохранением долгосрочных данных.

Например, если программа полностью перезаписывает содержимое файла, и механизм перезаписи реализован в демоническом потоке, то при неожиданном выходе из программы данные потеряются.

В демонические потоки часто помещают функции по рисованию графического интерфейса. Рисование интерфейса — бесконечный процесс, который завершается сразу после выхода из программы, если просто поместить его в обычный поток, это будет препятствовать закрытию программы.

Методы для работы с потоками

Для создания и управления потоками используются различные методы класса Thread. С их помощью можно легко манипулировать сразу несколькими потоками и определять их поведение.

start()

Он используется для запуска созданного потока. После использования threading.Thread() создаётся новый поток, однако он неактивен. Для того чтобы он начал работу, используется метод start().

import threading 
def myfunc(a, b): 
    print('сумма :',a + b) 
thr1 = threading.Thread(target = myfunc, args = (1, 2))
thr1.start()

Здесь пока мы не вызвали метод start, функция myfunc не будет запущена.

join()

Этот метод блокирует выполнение потока, который его вызвал, до тех пор пока не завершится поток, метод которого был вызван. То есть если в потоке thread1 был вызван метод потока thread2: thread2.join(), то поток thread1 будет приостановлен до тех пор, пока выполнение thread2 не завершится.

С помощью этого метода можно заставить программу дождаться завершения демонического потока. Например, если вызвать метод в основном потоке, то программа не завершится, пока не выполнится демонический поток.

У метода join() есть аргумент timeout. По умолчанию он имеет значение None, но программист может передать в него число с плавающей точкой.

Если аргумент имеет значение по умолчанию, то выполнение потока приостанавливается, пока выполняется поток метода.

Если передать в качестве аргумента число, то для метода join() установится время ожидания, когда оно истечёт, поток продолжит свою работу.

Например, thr1.join(100) означает, что будет ожидаться завершение выполнения потока thr1 не более 100 секунд.

Так как метод join() всегда возвращает None, чтобы проверить, успел ли полностью выполниться поток за указанный timeout, нужно проверить, выполняется ли поток с помощью метода is_alive().

Рассмотрим пример:

import threading
import time
def myfunc(a, b):
    time.sleep(2.5)
    print('сумма :', a + b)
thr1 = threading.Thread(target = myfunc, args = (1, 2), daemon=True)
thr1.start()
thr1.join(0.125)
if thr1.is_alive():
    print('поток не успел завершиться')
else:
    print('вычисления завершены')

поток не успел завершиться

Здесь мы делаем поток демоническим, чтобы программа не дожидалась окончания выполнения функции. Подключаем модуль time, для того, чтобы сделать задержку в функции на 2.5 секунд. После старта потока, мы приостанавливаем основной поток на 0.125 секунд. Потом выполняем проверку is_alive(). Если выведет True, значит поток не закончил выполнение за 0.125 секунды.

run()

В этом методе описываются операции, выполняемые потоком. Он используется, когда явно создается экземпляр класса. Пример:

import threading as th
import time
class Thr1(th.Thread): # Создаём экземпляр потока Thread
    def __init__(self, var):
        th.Thread.__init__(self)
        self.daemon = True # Указываем, что этот поток - демон
        self.var = var # это интервал, передаваемый в качестве аргумента

    def run(self): # метод, который выполняется при запуске потока
        num = 1
        while True:
            y = num*num + num / (num - 10) # Вычисляем функцию
            num += 1
            print("При num =", num, " функция y =", y) # Печатаем результат
            time.sleep(self.var) # Ждём указанное количество секунд
x = Thr1(0.9)
x.start()
time.sleep(2)

При num = 2  функция y = 0.8888888888888888
При num = 3  функция y = 3.75
При num = 4  функция y = 8.571428571428571

is_alive()

Метод проверяет выполняется ли поток в данный момент. Его часто используют в связке с методом join(). Кроме того, с его помощью можно грамотно управлять выполнением потоков демонов, не позволяя им неожиданно завершить работу при закрытии программы, например:

while True:
    if thr1.is_alive() == True: # Проверяем, выполняется ли поток демон
        time.sleep(1) # Если да, ждем 1 секунду и проверяем снова
    else:
        break # Если нет, выходим из цикла и закрываем программу

Остановка потока

Бывают ситуации, когда требуется остановить поток, который работает в фоне. Допустим у нас поток у которого в функции run бесконечный цикл. В основной программе нам нужно его остановить. Тут самое простое — это создать некую переменную stop:

  • В бесконечном цикле делать постоянно её проверку и если она True, то завершать его.
  • Не использовать функции, которые могут блокировать выполнение на длительное время. Всегда использовать timeout.

Вот пример такой программы:

import threading 
stop = False
def myfunc():
    global stop
    while stop == False:
        pass
thr1 = threading.Thread(target = myfunc) 
thr1.start() 
stop = True
while thr1.is_alive() == True: 
    pass
print('поток завершился')

Здесь используем глобальную переменную stop. Когда нам нужно остановить поток, мы ей присваиваем значение True, а дальше просто ждём его завершения.

Состояние гонки

Состояние гонки или race condition – это ошибка, возникающая при неправильном проектировании многопоточной программы. Она возникает тогда, когда несколько потоков обращаются к одним и тем же данным. Например, переменная хранит число, которое пытаются одновременно изменить потоки thread1 и thread2, что приводит к непредсказуемым результатам или ошибке.

Распространена ситуация, когда один поток проверяет значение переменной на выполнение условия, чтобы совершить какое-то действие, но между проверкой условия и выполнением действия вмешивается второй поток, который изменяет значение переменной, что приводит к получению неправильных результатов, например:

x = 5
# Thread 1:
if x == 5: # Поток 1 проверяет условие и считает его верным
# Thread 2:
x = 1 # Поток два изменяет значение переменной
# Thread 1:
print("При x = 5 функция 2*x =", 2 * x) # Поток один выполняет действие

В итоге программа выведет сообщение: «При x = 5 функция 2*x = 2».

Состояние гонки может приводить к различным проблемам:

  • Утечки памяти.
  • Потеря данных.
  • Уязвимости в безопасности программы.
  • Получение ошибочных результатов.
  • Взаимные блокировки потоков.

Доступ к общим ресурсам (lock)

Для того чтобы предотвратить состояние гонки, нужно использовать блокировку threading.Lock(), которая не позволяет сразу нескольким потокам работать с одними и теми же данными. Иными словами, Lock защищает данные от одновременного доступа.

threading.Lock() – возвращает объект, который, образно выражаясь, является дверью в комнату, которая запирается, если в комнате кто-то находится. То есть если поток использовал Lock (вошел в комнату), то другой поток вынужден ждать до тех пор, пока использовавший Lock поток не откажется от него (выйдет из комнаты).

У полученного объекта есть два метода: acquire() и release(). Рассмотрим их.

acquire()

Метод позволяет потоку получить блокировку. Имеет два аргумента: blocking и timeout.

Когда вызывается с аргументом blocking равным True (значение по умолчанию), блокирует Lock до тех пор, пока он не будет разблокирован и возвращает True. Если объект уже заблокирован, поток приостанавливается и ждёт, пока объект не будет разблокирован, а затем сам блокирует его.

При вызове с аргументов False, если объект Lock разблокирован, метод блокирует его и возвращает True. Если Lock уже заблокирован, метод ничего не делает и возвращает False.

Аргумент timeout (по умолчанию -1) можно изменить, только если аргумент blocking имеет значение True. Если в качестве аргумента передать положительное значение, то объект блокируется на указанное количество секунд с учётом времени ожидания блокировки. Аргумент по умолчанию указывает методу использовать бесконечное ожидание.

release()

Этот метод разблокирует объект Lock. Интерпретатор позволяет вызывать его из любого потока, а не только из потока, который заблокировал Lock в данный момент.

Метод ничего не возвращает и вызывает ошибку RuntimeError, если вызывается, когда объект Lock уже разблокирован.

Вот пример:

import threading
lock = threading.Lock()
x = 'Рython 2'
# ...
lock.acquire()
x = 'Рython 3'
print(x)
lock.release()

Python 3

Здесь мы создаём объект lock, с его помощью мы будем безопасно считывать и изменять данные. В качестве данных, которые мы будем блокировать в данном примере это одна переменная x. Далее показано безопасное изменение данных: вначале с помощью acquire дожидаемся своей очереди доступа к ним. Затем изменяем их (в нашем примере перезаписываем значение переменной с «Python 2» на «Python 3»). Далее выводим значение в консоль. После этого освобождаем доступ для других потоков. Если все потоки, которым нужен будет доступ к данным x будут использовать lock, то можно избежать «Состояния гонки».

deadlock

При использовании Lock возникает серьезная проблема, которая приводит к полной остановки работы программы. Если вызвать метод acquire(), а объект Lock уже заблокирован, то вызвавший acquire() поток будет ждать, пока заблокировавший объект поток не вызовет release().

Если один поток вызывает метод блокировки несколько раз подряд, то выполнение потока приостанавливается, пока он сам не вызовет release(). Однако он не может вызвать release, потому что его выполнение приостановлено, что означает бесконечную блокировку программы.

Самоблокировку можно предотвратить, если удалить лишний вызов acquire(), но это не всегда возможно. Самоблокировка может происходить из-за следующий вещей:

  • Возникновение ошибок, когда Lock остаётся заблокированным.
  • Неправильное проектирование программы, когда одна функция вызывается другой функцией, у которой отсутствует блокировка.

В случае возникновения ошибок достаточно воспользоваться конструкцией try-finally или оператором with.

Вот пример с with:

lock = threading.Lock() 
with lock:
    # операторы
    pass

Конструкция try-finally позволяет удалять блокировку даже в случае возникновения ошибок, что позволяет избежать deadblock. Пример:

lock = threading.Lock()
lock.acquire()
try:
    # операторы
    pass
finally:
    lock.release()

Конструкция try-finally гарантирует, что код в finally будет исполнен всегда, независимо от ошибок и результатов блока try.

Однако это не работает в случае самоблокировки из-за неправильного проектирования программы. Для этого был создан объект RLock.

RLock

Если Lock заблокирован, он блокирует любой поток, попытавшийся сделать то же самое, даже если этот поток и является владельцем блокировки в данный момент. Например, программист написал код:

import threading
lock1 = threading.Lock()
def part1():
    lock1.acquire()
    try:
        # вычислить сумму элементов первой части объекта
        pass
    finally:
        lock1.release()
    return sum
def part2():
    lock1.acquire()
    try:
        # вычислить сумму элементов второй части объекта
        pass
    finally:
        lock1.release()
    return sum
def both_parts():
    p1 = part1()
    p2 = part2()
    return p1, p2

Данный код будет работать, но его проблема заключается в том, что при вызове функции both_parts, в ней вызываются функции part1 и part2. Между вызовами этих функций может получить доступ к данным какой-нибудь другой поток и их поменять. А что делать, если нужно избежать изменения другим потоком?

Чтобы решить проблему, нужно заблокировать lock1 и в both_parts, перепишем её:

def both_parts():
    lock1.acquire()
    try:
        p1 = part1()
        p2 = part2()
    finally:
        lock1.release()
    return p1, p2

Идея проста: внешняя both_parts блокирует поток на время выполнения функций part1 и part1. Каждая из функций также блокирует поток для суммирования своей части объекта. Однако объект Lock не позволит этого сделать, этот код приведет к полному зависанию программы, потому что для Lock нет разницы, где в потоке был вызван acquire().

RLock блокирует поток только в том случае, если объект заблокирован другим потоком. Используя RLock, поток никогда не сможет заблокировать сам себя.

Использовать RLock нужно для управления вложенным доступом к разделяемым объектам. Чтобы решить возникшую проблему с Lock в коде выше, достаточно заменить строчку «lock1 = threading.Lock()» на «lock1 = threading.RLock()».

Также следует помнить, что, хотя и можно вызывать acquire() несколько раз, метод release() нужно вызвать столько же раз. При каждом вызове acquire() уровень рекурсии увеличивается на единицу, соответственно при каждом вызове release() он уменьшается на единицу.

Передача данных с помощью очередей (Queue)

Для передачи данных с помощью очередей используется класс Queue из библиотеки queue, который импортируется командной: «from queue import Queue».

Библиотеке queue содержит все необходимые инструменты для передачи данных между потоками и реализует нужные механизмы блокировки.

Класс Queue реализует очередь FIFO, который работает так: первый элемент, который пошел в очередь, первым и выйдет из неё. Эту очередь можно сравнить с вертикальной полой трубой, в которую сверху бросают элементы.

Queue имеет параметр maxsize, принимающий только целочисленные значения. Он указывает максимальное количество элементов, которое можно поместить в очередь. Когда максимум достигается, добавление в очередь элементов блокируется, пока в ней не освободится место. Если maxsize принимает значение <= 0, то очередь является бесконечной.

Для взаимодействия с очередями используется Event, объект модуля threading. С его помощью поток может выполнить нужные операции тогда, когда получит сигнал от другого потока. Кроме того, поток не обязательно должен приостанавливать свою работу на время ожидания сигнала.

Для передачи данных и работы с очередями используются методы (работают со всеми видами очередей, а не только с Queue):

qsize()

Возвращает примерный размер очереди. Важно понимать две вещи:

  • Если qsize() больше нуля, следующий метод get() всё равно может быть заблокирован.
  • Если qsize() меньше maxsize, следующий метод put() может быть заблокирован.

Это может возникнуть из-за того что к очереди могут обратиться другие потоки и получить/записать данные сразу после того как вы получили её размер.

empty()

Метод проверяет, содержится ли что-то в очереди. Если очередь пуста, возвращается True, если очередь содержит элементы, возвращается False.

Как и с методом qsize(), возврат True или False не гарантирует, что следующий метод put() или get() не будут заблокированы.

full()

Проверяет, заполнена ли очередь. Если очередь заполнена, возвращает True, иначе возвращает False.

Как и в предыдущих методах, возврат True или False не даёт гарантий, что put() и get() не будут заблокированы.

put()

Метод помещает новый объект в очередь, имеет обязательный аргумент item и два необязательных аргумента: block = True и timeout = None.

Queue.put(item, block=True, timeout=None)

В зависимости от указанных аргументов, ожидание места в очереди будет вести себя по-разному:

  • Если аргумент block имеет значение True, а timeout — None, объект, который нужно загрузить в очередь, будет бесконечно ждать свободного места.
  • При timeout больше нуля, ожидание свободного места будет длиться не дольше указанного числа секунд, если за это время свободного места так и не появилось, возбудится исключение.
  • Если block имеет значение False, аргумент timeout игнорируется, и элемент можно поместить в очередь, только если есть свободное место, иначе сразу же возбуждается исключение.

Вот пример создания очереди на Python и добавления в неё элемента:

from queue import Queue
queue1 = Queue()
x = 'some data'
queue1.put(x)

put_nowait()

Эквивалентно вызову put(item, False). То есть помещает элемент в очередь, только если есть место, иначе вызывает исключение.

get()

Удаляет и возвращает элемент из очереди. Имеет два необязательных аргумента: block = True и timeout = None.

Queue.get(block=True, timeout=None)

В зависимости от значений аргументов ожидание объекта ведёт себя по разному:

  • Если аргументы имеют значению по умолчанию, метод ожидает объект из очереди до тех пор, пока тот не станет доступен.
  • При timeout — положительное число, то объект из очереди ожидается определенное время, по истечении которого вызовется исключение.
  • Если block имеет значение False, элемент возвращается, только если он доступен, иначе вызывается исключение (аргумент timeout игнорируется).

Вот пример. Здесь мы добавляем строку в очередь. Затем мы её получаем и выводим в консоль:

from queue import Queue
queue1 = Queue()
queue1.put('Python 3')
value = queue1.get()
print(value)

Python 3

get_nowait()

Эквивалентно вызову get(False).

task_done()

Этот метод работает в связке с методом join().

Метод показывает, что ранее поставленная задача была выполнена. После получения каждого элемента из очереди, допустим с помощью get(), нужно вызывать task_done(), чтобы уменьшить счётчик задач. Ниже описан метод join с примером

Если task_done() вызывается больше раз, чем количество элементов, помещенное в очередь, то возбуждается исключение ValueError.

join()

Блокирует поток, пока все элементы очереди не будут получены и обработаны.

Каждый раз, когда в очередь добавляется новый элемент, увеличивается счетчик незавершённых задач. При вызове task_done(), счетчик уменьшается, показывая, что обработка элемента в очереди завершена и можно переходить к следующему. Когда счетчик равен нулю, с потока снимается блокировка.

Вот пример:

from queue import Queue 
queue1 = Queue() 
queue1.put('Python 2')
queue1.put('Python 3')
print(queue1.get())
queue1.task_done()
print(queue1.get())
queue1.task_done()
queue1.join()

Этот пример для того чтобы показать работу join и task_done. Здесь всё происходит в одном потоке. Обычно пишет в очередь один поток данные, потом ждёт когда их обработают с помощью join. Другой же поток при получении каждого нового значения вызывает task_done.

Пример программы

Суть программы проста: студенты должны сдать зачетную работу. Работа сдаётся двум преподавателям, время, за которое проверят работу, зависит от рейтинга студента (чем больше рейтинг, тем лучше работа, тем меньше проверяют).

Программа:

from queue import Queue
import time, datetime, threading
students= [(99, "Андрей"),
           (76, "Александр"),
           (75, "Никита"),
           (72, "Евгений"),
           (66, "Алексей"),
           (62, "Сергей"),
           (50, "Михаил")]
def student(q):
    while True:
        # Получаем задание из очереди
        check = q.get()
        # Выводим время начала проверки
        print(check[1], 'сдал работу в', datetime.datetime.now()
            .strftime('%H:%M:%S')) 
        #Время затраченное на проверку, которое зависит от рейтинга
        time.sleep((100-check[0])/5)
        # Время окончания проверки
        print(check[1], 'забрал работу в', datetime.datetime.now()
            .strftime('%H:%M:%S'))
        # Даём сигнал о том, что задание очереди выполнено
        q.task_done()
# Создаем очередь
q = Queue()
# Загружаем в очередь студентов
for x in students:
    q.put(x)
#создаём и запускаем потоки
thread1 = threading.Thread(target=student, args=(q,), daemon=True)
thread2 = threading.Thread(target=student, args=(q,), daemon=True)
thread1.start()
time.sleep(10)
thread2.start()
# Блокируем выполнение до завершения всех заданий
q.join()
print("Этот текст напечатается после окончания блокировки")

Результаты:

Андрей сдал работу в 18:58:35
Андрей забрал работу в 18:58:36
Александр сдал работу в 18:58:36
Александр забрал работу в 18:58:40
Никита сдал работу в 18:58:40
Евгений сдал работу в 18:58:45
Никита забрал работу в 18:58:45
Алексей сдал работу в 18:58:45
Евгений забрал работу в 18:58:51
Сергей сдал работу в 18:58:51
Алексей забрал работу в 18:58:52
Михаил сдал работу в 18:58:52
Сергей забрал работу в 18:58:59
Михаил забрал работу в 18:59:02
Этот текст напечатается после окончания блокировки

Как видно из результатов, студенты обрабатывались в два потока, двумя преподавателями. q.join() заблокировало выполнение основного потока, так что текст распечатался только после завершения всех заданий очереди.

Полезные инструменты модуля threading

Threading имеет ещё несколько полезных инструментов, которые могут пригодиться для решения более специализированных задач.

Semaphore

Это один из старейших примитивов для синхронизации в истории информатики. Семафор использует внутренний счётчик, который уменьшается при каждом вызове acquire() и увеличивается при каждом вызове release(). Счётчик не может стать меньше нуля, когда он становится равным нулём, acquire() блокирует поток.

Вот пример:

import threading 
x = 'Python'
sem = threading.Semaphore()
sem.acquire()
x = 'Python 2'
sem.release()
with sem:
    x = 'Python 3'

Здесь привёл два варианта получения доступа к данным:

  • C помощью acquire и release.
  • Используя with.

Timer

Этот класс позволяет контролировать время запуска какого-либо действия. Timer является подклассом Thread.

Вот его аргументы:

threading.Timer(interval, function, args=None, kwargs=None)

Таймеры запускаются также, как и потоки, с помощью метода start(). Их можно остановить, используя метод cancel().

С помощью таймера программист может вызывать функцию, присвоить значение переменной или, например, запустить поток в определенное время.

Пример использования:

import threading
def myfunc():
    print('tick-tack')
timer = threading.Timer(4, myfunc)
timer.start()

tick-tack

Здесь функция myfunc выполнится через 4 секунды после вызова метода start().

Barrier

Этот класс позволяет реализовать простой механизм синхронизации потоков. Его можно использовать для фиксированного числа потоков, когда необходимо, чтобы каждый поток ждал выполнения какого-либо действия всеми.

Для того чтобы продолжить выполнения, все потоки должны вызвать метод wait(), если хоть один поток не сделал этого, остальные блокируются до тех пор, пока метод не будет вызван.

Так выглядят его аргументы:

threading.Barrier(parties, action=None, timeout=None)

Рассмотрим пример использования:

import threading
import time
barrier = threading.Barrier(2)
def myfunc():
    barrier.wait()
    print('отработал barrier')
thr1 = threading.Thread(target = myfunc).start()
time.sleep(1)
print('основной поток')
barrier.wait()

основной поток
отработал barrier

Здесь выставляю barrier на 2 вызова wait. То есть,  для того, чтобы выполнился код после wait, wait должен быть вызван в 2 потоках. В данном случае функция myfunc сразу запускается в потоке, но она сразу не выведет 'отработал barrier' в консоль, а дождётся когда в основном потоке будет вызван wait тоже.

Event

Event представляет собой простой механизм реализации связи между потоками: один поток даёт сигнал о событии, другие ожидают этого сигнала.

Объект события управляет внутренним флагом, который может быть установлен в True или False с помощью методов set() и clear(). Также есть методы is_set(), которым можно проверить состояние внутреннего флага. С помощью метода wait(timeout=None) можно ждать пока не выставлен флаг в True. Так же при необходимости можно задать время ожидания.

Вот пример:

import threading
import time
event = threading.Event()
def myfunc():
    time.sleep(1)
    event.set()
thr1 = threading.Thread(target = myfunc).start()
print(event.is_set())
event.wait()
print(event.is_set())
event.clear()
print(event.is_set())

False
True
False

Заключение

Возможность управления потоками в Python – это мощный инструмент в разработке больших программ. Для работы с ними используется модуль Threading и библиотека queue в связке с ним.

Каждый программист Python должен уметь работать с потоками, очередями и понимать, как устроена блокировка, доступ к данным и их передача между потоками.