Асинхронность. Многопоточность. Многопроцессорность.
Введение
Со временем новоиспеченный программист понимает, что решение задач из курсов по программированию это обычное введение, своего рода отправная точка в обширное, сложное и невероятно интересное ремесло - программирование. Написать программу, которая выполняет задуманную задачу это достаточно, уместнее даже употребить здесь термин относительно, простая задача, куда более трудная задача написать программу, которая выполняет задуманную задачу быстро. Скорость работы программы - пожалуй самая важная задача, бесспорно после самого факта ее работы. До начала этого блока мы сталкивались с вами только с синхронно работающими программами. Что имеется в виду под синхронным программированием? Синхронная модель программирования подразумевает, во-первых, выделение одного потока под одну задачу, а во вторых выполнение операций внутри этой задачи происходит последовательно. Отсюда вытекает новый вопрос, что такое поток? Для понимания этого вопроса нужно немного разобраться как устроена работа операционных систем наших устройств. Если в Linux открыть System Monitor, аналог диспетчера задач на Windows, мы увидим список процессов запущенных на нашем компьютере.

Запуск нового приложения на компьютере, например, запуск браузера, запускает процесс. Процессы работаю изолированно, друг от друга. Процессы можно разделить на 'пользовательские' - приложения, запущенные пользователями, тот же браузер и 'системные' - процессы, работающие над служебными операциями. Каждый из этих процессов потребляет ресурсы устройства. Главная задача операционной системы - контроль за потреблением этих ресурсов и распределение этих ресурсов между процессами. Чем больше оперативной памяти у устройства, тем быстрее оно способно обрабатывать большее количество процессов. Для обработки процессов операционная система последовательно выделяет промежутки времени, в течении которых процессы могут пользоваться ресурсами. Операционная система сама решает какому процессу сейчас это время нужнее. Отсюда и вытекает нужда в написании программ, которые будут минимально расходовать ресурсы устройства, и на отработку которых операционная система будет выделять меньше временных промежутков. Сам по себе процесс ничего не выполняет, можно сказать, что процесс это некий 'контейнер', содержимое которого изолировано от содержимого других 'контейнеров'. Этим содержимым и являются потоки. Процесс может содержать несколько потоков, работающих изолированно внутри процесса и у потоков также есть приоритет исполнения, который контролируется операционной системой. Подытоживая, все программы исполняются в этих самых потоках. Вернемся к синхронному программированию. Как было сказано ранее, данная модель подразумевает выделение одного потока под одну задачу. Важно понять, под словами 'под одну задачу' мы понимаем то, что в данном выделенном промежутке времени для выполнения потока, в нем может происходить только одна задача, и когда эта задача отработает поток может принять в себя следующую задачу. Такая среда синхронного программирования называется однопоточной. Выделяют также многопоточную среду, в таком варианте существует несколько параллельных потоков, внутри которых также одна за другой могут выполняться задачи. Несложно догадаться, что многопоточная среда работает быстрее.

Я нарисовал такую схему. Это скорее пример для наглядности, в реальных ситуациях задач обычно больше, чем потоков, так что многопоточную модель можно представить как, к примеру, окошки для записи на прием к врачам в больницах, окошек(потоков) обычно меньше, чем людей(задач) и как только новое окошко(поток) становится свободным, новый человек(задача) может подойти туда для решения своего вопроса. Очевидно, что одно окошко(поток) будет обрабатывать вопросы посетителей(задачи) медленнее чем несколько окошек(потоков). Казалось бы, отличное решение вопроса скорости, больше параллельно работающих потоков - больше исполняемых задач в единицу времени, только вот мы тут говорим в первую очередь о python и python не поддерживает параллельную работу потоков, подробнее об этом мы поговорим, когда будем рассматривать библиотеки для работы с потоками. Но это совсем не страшно ведь существует способ обработки задач, работающий еще быстрее многопоточной среды синхронного программирования, но это утверждение справедливо, только когда речь идет об I/O-bound операциях(об этом дальше).
Из чего вообще состоит задача, или раз мы тут говорим о программировании, можно сказать программа. Программа состоит из запроса(request), то есть начала выполнения программы и ответа на этот запрос(response), то есть окончания работы программы. Как нам известно программы исполняются сверху вниз и в синхронном программировании это можно представить так.

В поток поступила программа, и допустим программа состоит из 3 функций. В синхронном программировании пока мы не получим response на отправленный request, новый request не начнется, то есть функции внутри программы выполняются в порядке очереди. А что если бы мы захотели изменить это поведение подобным образом.

При таком поведении для начала исполнения нового запроса не нужно дожидаться ответа на предыдущий запрос. Такое поведение ускоряет работу программы. И именно такое поведение лежит в основе асинхронного программирования.
CPU-bound и I/O-bound
Перед переходом к практике нужно кратко обсудить еще пару моментов. В программировании различают два типа процессинга: CPU-bound и I/O-bound. Определение к какому из этих типов относится процесс, поможет с выбором правильного подхода к оптимизации кода. CPU-bound - операции завязанные исключительно на мощности процессора (CPU - central processing unit). I/O-bound - операции завязанные на прочих, более медленных устройствах компьютера чем CPU, например SSD диск (I/0 - Input/Output - Ввод/Вывод). Ранее я упоминал, что операционная система сама принимает решение для какого процесса сейчас нужнее временной отрезок, такая модель характерна в первую очередь для CPU-bound. Для I/O-bound характерен тот подход, который мы разобрали на схеме выше, подход при котором запрос, ответ на который еще не получен не блокирует выполнение всей программы, а дает возможность начаться новому запросу.
Асинхронное программирование
Библиотека asyncio
После теории становится понятно из каких терминов сформировано название библиотеки asyncio, async - асинхронность и io - I/O-bound. asyncio не единственная библиотека, через которую можно реализовать идею асинхронного программирования, но для углубления в эту тему asyncio отлично поможет, тем более asyncio предустановлен в стандартный набор библиотек python начиная с версии 3.4. Скорее всего функционала asyncio будет достаточно для реализации любой вашей задачи связанной с асинхронным программированием. И на самом деле синтаксис asyncio очень прост. Реализация асинхронности основывается на, так называемых, корутинах(coroutines). Корутины это обычные функции, перед которыми стоит ключевое слово async. А ключевое слово await - используется для операций, результат которых мы хотим получать асинхронно. И последний момент, для того чтобы корутины работали программу необходимо обернуть в цикл обработки событий - event loop, и в asyncio event loop запускается командой asyncio.run(). Два ключевых слова и одна команда это не вся библиотека, но знакомство мы начнем именно с этого.
import time def func(): for n in range(2): print(1) time.sleep(1) print(2) def main(): star_time = time.time() func() finish_time = time.time() release_time = finish_time - star_time print(release_time) if __name__ == '__main__': main()
1 2 1 2 2.002147674560547 Process finished with exit code 0
import time import asyncio async def async_func(): print(1) await asyncio.sleep(1) print(2) async def main(): star_time = time.time() await asyncio.gather(async_func(), async_func()) finish_time = time.time() release_time = finish_time - star_time print(release_time) if __name__ == '__main__': asyncio.run(main())
1 1 2 2 1.000763177871704 Process finished with exit code 0
Рассмотрим две программы. Главная функция обеих программ выводит 1 и затем засыпает на 1 секунду и выводит 2, это действие повторяется дважды и в обеих программах мы засекаем время ее исполнения с помощью библиотеки time. Программа example_0 исполняется синхронно, у нас есть функция func(), внутри которой в цикле два раза повторяется поочередный вывод цифр 1 и 2 с паузой в одну секунду. Затем в функции main() мы вызываем функцию func() и засекаем время ее работы методом time.time() библиотеки time. Не забываем про конструкцию if __name__ == '__main__'. Как итог, программа отработала за 2 секунды, чего и следовало ожидать.
Теперь перейдем к программе example_1. Импортируем asyncio. И сделаем из обычных функций, функции корутины. Добавим ключевое слово async перед функцией async_func() и main(). И также ключевые слова await перед функциями, продолжение работы которых не должно мешать началу работы новой функции. Обратите внимание мы использовали не обычный sleep модуля time, а его аналог из библиотеки asincio, тем не менее время работы программы мы измерили так же, как и в первой программе. Также в первой программе тело первой функции было обернуто в цикл for, во второй же мы исполнили эту функцию дважды внутри функции main с помощью метода asyncio.gather. Дело в том, что async for работает по другому, нежели обычный for, но для того, чтобы не перегружать материал, этого мы коснемся немного позже, использование же обычного цикла for, как вы понимаете, поломает всю нашу асинхронность. И конечно запустим event loop методом asyncio.run() внутри конструкции if __name__ == '__main__'. Результат наглядно показывает то, о чем мы говорили в теории, асинхронные функции начинают исполнять новый запрос не дожидаясь ответа на отправленный ранее. Отсюда в первом примере 1 и 2 чередуются, а во втором мы отправили 2 запроса, подождали секунду и получили 2 ответа. Тем самым сократив время работы на одну секунду. Одна секунда это не очень много, тем не менее это все равно быстрее. А что если мы хотим повторить поочередный вывод цифр 1 и 2 больше чем дважды.
import time def func(): for n in range(10): print(1) time.sleep(1) print(2) def main(): star_time = time.time() func() finish_time = time.time() release_time = finish_time - star_time print(release_time) if __name__ == '__main__': main()
1 2 1 2 ... 1 2 10.008908033370972 Process finished with exit code 0
import time import asyncio async def async_func(): print(1) await asyncio.sleep(1) print(2) async def main(): star_time = time.time() await asyncio.gather(async_func(), async_func(), async_func(), async_func(), async_func(), async_func(), async_func(), async_func(), async_func(), async_func(),) finish_time = time.time() release_time = finish_time - star_time print(release_time) if __name__ == '__main__': asyncio.run(main())
1 ... 1 2 ... 2 1.001136064529419 Process finished with exit code 0
Скажем 10 раз. Результат все тот же. Я попробовал исполнить эту функцию 200 раз, результат был тот же, 200 секунд для первой и 1 для второй, затем 2000 раз, результат 2000 секунд для первой программы и 1 для второй, затем я выполнил это действие 10000 раз, и результат для второй программы составил 1.38 секунд, против 10000 секунд в первой, конечно, ждать 2000 и 10000 секунд в обоих случаях я не стал, результат и так очевиден. И исполнение программы за 1.38 секунд против 10000 секунд, с помощью всего пары простых изменений,

результат впечатляет.
async for
Раз уж мы ранее коснулись async for, то не будем далеко это откладывать. В основе asyncio лежат обычные генераторы, те самые которые мы учим, когда только начинаем осваивать python. И кажется раз в основе asyncio лежат генераторы, которые в свою очередь возвращают итераторы, то обойтись без for не получится. Только вот у нас тут все асинхронное, асинхронный итератор, асинхронный итерируемый объект и, следовательно, for у нас тоже асинхронный. А также это значит, что у нас есть асинхронные аналоги методов __iter__() и __next__(), а именно __aiter__() и __anext__().
import asyncio import time async def func(arg_1_1): for i in range(arg_1_1): yield i await asyncio.sleep(1) async def run(arg_1_2): async for i in func(arg_1_2 + 1): print(i) print('asyncio.sleep(1)') async def main(): star_time = time.time() await asyncio.gather(run(4), run(2)) finish_time = time.time() release_time = finish_time - star_time print(release_time) if __name__ == '__main__': asyncio.run(main())
0 asyncio.sleep(1) 0 asyncio.sleep(1) 1 asyncio.sleep(1) 1 asyncio.sleep(1) 2 asyncio.sleep(1) 2 asyncio.sleep(1) 3 asyncio.sleep(1) 4 asyncio.sleep(1) 5.006062269210815 Process finished with exit code 0
Рассмотрим пример. Функция run принимает какое-то целочисленное значение в качестве аргумента. Внутри функции run создадим асинхронный for внутри которого будем проходить по функции func столько раз, сколько мы сами задали, в данном примере мы асинхронно запускаем два цикла и выводим результат прохода по ним, а после засыпаем на 1 секунду. Поскольку у нас работаю два цикла одновременно, сообщение о засыпании мы видим вдвое чаще чем должны. Что имеем в результате, у нас есть два цикла, в каждом из которых после очередной итерации мы засыпаем на секунду, в первом цикле мы проходим по нему 5 раз, во втором 3. Но вместо 8 секунд программа исполнилась за 5.
import time def func(arg_1_1): for i in range(arg_1_1): yield i time.sleep(1) def run(arg_1_2): for i in func(arg_1_2 + 1): print(i) print('asyncio.sleep(1)') def main(): star_time = time.time() run(2) run(4) finish_time = time.time() release_time = finish_time - star_time print(release_time) if __name__ == '__main__': main()
0 asyncio.sleep(1) 1 asyncio.sleep(1) 2 asyncio.sleep(1) 0 asyncio.sleep(1) 1 asyncio.sleep(1) 2 asyncio.sleep(1) 3 asyncio.sleep(1) 4 asyncio.sleep(1) 8.008046627044678 Process finished with exit code 0
А вот тот же пример, с использованием обычного for. Как и следует ожидать от такой программы, сначала отрабатывает первый цикл, а только затем второй. И вот уже ее время исполнения 8 секунд, все как и должно быть.
aiohttp
Асинхронность нам в большей степени интересна со стороны web-разработки.
Для асинхронной web-разработки существуют отдельные фреймворки, которые только набирают популярность, с такими фреймворками мы познакомимся отдельно, сейчас же хотелось бы поговорить о более низкоуровневых инструментах для асинхронной работы с http запросами.
Для начала установим библиотеку aiohttp
pip install aiohttp
Мы не станем разбираться с ней очень подробно, этот раздел нужен скорее для дополнительных примеров по работе с корутинами. Плюс хочется показать вам один из сайтов для тренировки по работе с http запросами.
Существует сайт {JSON} Placeholder на нем есть небольшая база данных связанных между собой таблиц, они уже представлены в json формате, мы можем делать к ним запросы и получать эти данные.
Например, есть страница с альбомами фотографий.
У каждого альбома есть id пользователя, который создал этот альбом, всего в базе 10 пользователей и 100 альбомов, по 10 на каждого, есть id самого альбома и есть его название.
Как бы в самом простом формате мы могли бы забрать данные с этого сайта и вывести их в консоли? С помощью библиотеки requests.
Если ранее вы с ней не работали, то потребуется ее установить
pip install requests
import requests url = 'https://jsonplaceholder.typicode.com/albums?' print(requests.get(url).text)
[ { "userId": 1, "id": 1, "title": "quidem molestiae enim" }, { "userId": 1, "id": 2, "title": "sunt qui excepturi placeat culpa" }, ...
Очень просто. Импортируем библиотеку requests, пишем url, применяем к нему метод get для получения данных и к полученным данным метод text для вывода результата.
Получаем 100 записей, все 100, конечно, вставлять в результат я не стал, формат у всех такой же как у первых двух.
В url мы можем добавить параметр
url = 'https://jsonplaceholder.typicode.com/albums?userId=1'
добавим ?userId=1, тем самым заберем только записи пользователя с id=1, первые 10 записей.
Таким образом мы получаем данные в json формате, чтобы преобразовать их к привычному python формату надо применить к полученному ответу метод .json()
import requests url = 'https://jsonplaceholder.typicode.com/albums?userId=1' response = requests.get(url) print(response.json())
print() распечатает нам список из 10 словарей, каждый из которых содержит информацию об отдельном альбоме.
import requests class Album: def __init__(self, user_id, album_id, album_title): self.user_id = user_id self.album_id = album_id self.album_title = album_title @staticmethod def json_render(obj): user_id = obj['userId'] album_id = obj['id'] album_title = obj['title'] return Album(user_id, album_id, album_title) def albums_by_user(user): url = f'https://jsonplaceholder.typicode.com/albums?userId={user}' response = requests.get(url) albums_json = response.json() return [Album.json_render(album) for album in albums_json] print([album.album_title for album in albums_by_user(1)])
['quidem molestiae enim', 'sunt qui excepturi placeat culpa', 'omnis laborum odio', 'non esse culpa molestiae omnis sed optio', 'eaque aut omnis a', 'natus impedit quibusdam illo est', 'quibusdam autem aliquid et et quia', 'qui fuga est a eum', 'saepe unde necessitatibus rem', 'distinctio laborum qui']
Можно добавить объект каждого отдельного альбома в виде экземпляра класса Album. В этом же классе добавим метод json_render для создания объекта класса из полученного с get запросом словаря с информацией об альбоме.
Сам запрос снесем в функцию albums_by_user, куда можно будет передать любого пользователя и забрать его альбомы, а возвращать функция будет список экземпляров класса Album.
На печать выведем список названий альбомов.
import requests class Album: """ Класс для объявления альбомов. Источник https://jsonplaceholder.typicode.com/albums Каждый альбом содержит 3 параметра. """ def __init__(self, user_id, album_id, album_title): self.user_id = user_id self.album_id = album_id self.album_title = album_title @staticmethod def json_render(obj): """ Создаем экземпляр класса. Данные для переменных забираем из словаря полученных с запросом данных """ user_id = obj['userId'] album_id = obj['id'] album_title = obj['title'] return Album(user_id, album_id, album_title) def albums_by_user(user): """ Функция для запроса к альбомам, забирает альбомы переданного пользователя. Полученные сырые данные преобразуем к списку словарей и из каждого словаря создаем экземпляр класса Album методом json_render() """ url = f'https://jsonplaceholder.typicode.com/albums?userId={user}' response = requests.get(url) albums_json = response.json() return [Album.json_render(album) for album in albums_json] def show_albums(albums): """Выводим заголовки полученных альбомов""" for album in albums: print(f'{album.album_title}') def main(): """Запускаем сбор альбомов пользователя с id=1""" albums = albums_by_user(1) show_albums(albums) if __name__ == '__main__': main()
quidem molestiae enim sunt qui excepturi placeat culpa omnis laborum odio non esse culpa molestiae omnis sed optio eaque aut omnis a natus impedit quibusdam illo est quibusdam autem aliquid et et quia qui fuga est a eum saepe unde necessitatibus rem distinctio laborum qui Process finished with exit code 0
Добавим программе еще немного структурированности и комментариев, такое количество комментариев для такой программы излишне, когда речь идет о реальных программах, но работа с запросами важный навык и дополнительное объяснение в учебном материале лишним не будет.
Думаю разжевано все максимально подробно.
Теперь мы можем передать в albums_by_user() 10 пользователей поочередно и совершить 10 поочередных запросов.
def main(): """Запускаем сбор альбомов пользователя с id=1""" n = 1 while n <= 10: albums = albums_by_user(n) show_albums(albums) n += 1
Допустим вот так.
Подключаем библиотеку aiohttp, очень грубо говоря, асинхронный аналог requests.
import asyncio import aiohttp class Album: """ Класс для объявления альбомов. Источник https://jsonplaceholder.typicode.com/albums Каждый альбом содержит 3 параметра. """ def __init__(self, user_id, album_id, album_title): self.user_id = user_id self.album_id = album_id self.album_title = album_title @staticmethod def json_render(obj): """ Создаем экземпляр класса. Данные для переменных забираем из словаря полученных с запросом данных """ return Album(obj['userId'], obj['id'], obj['title']) def show_albums(albums): """Выводим заголовки полученных альбомов""" for album in albums: print(f'{album.album_title}') async def main(): """Создаем aiohttp.ClientSession(), от него мы можем асинхронно совершать запросы к 100 разным серверам""" async with aiohttp.ClientSession() as session: async with session.get('https://jsonplaceholder.typicode.com/albums?userId=1') as resp: albums_json = await resp.json() albums = [Album.json_render(album) for album in albums_json] show_albums(albums) if __name__ == '__main__': asyncio.run(main())
Делает эта программа точно то же самое, что и ее неасихнронная версия, просто я записал ее покомпактнее, всю асинхронность вынес в корутин main(). Строки
async with aiohttp.ClientSession() as session:
async with session.get('https://jsonplaceholder.typicode.com/albums?userId=1') as resp:
взяты с первой страницы документации, async with дает нам гарантию, что запрос не только не только не заблокируется, но и полностью завершится. aiohttp.ClientSession поддерживает одновременное подключение к 100 серверам и от этой сессии мы в примере обратились к {JSON} Placeholder, соответственное только от этой сессии мы можем подключиться еще к 99 серверам и делать к ним запросы асинхронно.
Подробнее об этом на этой странице документации.
Зная это можем обратиться от сессии одновременно к альбомам первых 3-х авторов.
async def main(): async with aiohttp.ClientSession() as session: async with session.get('https://jsonplaceholder.typicode.com/albums?userId=1') as resp: albums_json = await resp.json() albums = [Album.json_render(album) for album in albums_json] show_albums(albums) async with session.get('https://jsonplaceholder.typicode.com/albums?userId=2') as resp: albums_json = await resp.json() albums = [Album.json_render(album) for album in albums_json] show_albums(albums) async with session.get('https://jsonplaceholder.typicode.com/albums?userId=3') as resp: albums_json = await resp.json() albums = [Album.json_render(album) for album in albums_json] show_albums(albums) ...
Таким образом три запроса к альбомам разных авторов выполняются параллельно и результат мы получаем мгновенно.
Порядок вывода. Tasks
Когда речь заходит об асинхронности, то скорее всего она будет использована для отдельных задач. Нам хочется, чтобы одни процессы происходили отдельно от других, не мешали друг другу, не блокировали друг друга, для этого мы можем вынести их в отдельную задачу и сказать ей исполнятся по команде, тогда когда нам это будет нужно. Для работ с задачами есть инструмент под названием Celery, но разговор о Celery тема отдельного материала, тем не менее Celery и asincio могут работать вместе и упомянуть о возможности создавать задачи средствами asincio считаю нужным занятием.
Первый вопрос, который хочется озвучить: имеет ли значение порядок ключевых слов await для вывода разных корутин?
Сразу посмотрим на примере.
import asyncio async def simple_msg(text): """Простой корутин, который возвращает текст""" print(text) async def count(x): """Простой корутин, который возвращает количество битов""" print(x.bit_count()) async def long_operation(msg): """Корутин с большой задержкой""" print(f'Старт задачи {msg}') await asyncio.sleep(3) print(f'Конец задачи {msg}') async def main(): """Функция, в которой проверяем важен ли порядок вывода корутин""" # Работа двух мгновенных корутин await simple_msg('Сообщение 1') await count(10) # Работа корутина с задержкой await long_operation('Задача 1') # Работа двух мгновенных корутин await simple_msg('Сообщение 2') await count(5) if __name__ == "__main__": # event_loop asyncio.run(main())
Конечно порядок имеет значение. Пусть программирование у нас в данном случае и асинхронное, но исполнение кода сверху вниз никто не отменял.
Напишем три корутины, но две будут исполняться мгновенно, а одна будет имитировать задержу в несколько секунд. Результат будем получать в функции main(), ничего не мешает использовать несколько ключевых слов await дял вывода разных функций. Таким образом при запуске данной программы все await запустятся последовательно.
Сообщение 1 2 Старт задачи Задача 1 Конец задачи Задача 1 Сообщение 2 2 Process finished with exit code 0
Вывод ожидаемый, но тут становится очевидна проблема, два последних исполнения мгновенноисполняемых корутин не начнутся пока мы не дождемся окончания выполнения корутины с задержкой. Зачем нам ждать пока она завершится? Пусть сначала выполняться мгновенные корутины, а потом остальные. И решение очевидно, поменять выводы местами и просто поставить работу более долгой операции в конец. Но есть другой вариант, создать задачу на основе этого корутина и также вызвать ее в конце.
... async def main(): """Функция, в которой проверяем важен ли порядок вывода корутин""" # Работа двух мгновенных корутин await simple_msg('Сообщение 1') await count(10) # Работа корутина с задержкой, обернутой в task task = asyncio.create_task(long_operation('Задача 1')) # Работа двух мгновенных корутин await simple_msg('Сообщение 2') await count(5) await task if __name__ == "__main__": # event_loop asyncio.run(main())
Сообщение 1 2 Сообщение 2 2 Старт задачи Задача 1 Конец задачи Задача 1 Process finished with exit code 0
Создается задача методом .create_task(), в качестве аргумента принимает функцию корутин. К объекту типа Task также применимо ключевое слово await, поскольку Task это awaitable объект, который на самом деле очень родственен с корутином. Просто использование задачи добавляет некоторую гибкость и компактность, когда речь идет именно о вызове и сборе данных из вызова, и конечно объекты типа Task имеют свое api, часть их котого мы рассмотрим.
Я упомянул термин awaitable объект. Это объекты, к которым мы можем применить ключевое слово await, то есть объект ожидающий результат. И в asincio существует три типа объектов, к которым применимо слово await. Два из них уже были упомянуты - coroutine и task, и существует еще один тип - future. Future более низкоуровневый чем Task тип и взаимодействие напрямую с ним чаще всего не имеет большого смысла. Пока задачи типа Task и Future исполняются, они меняют свои состояния. Задача может быть еще не запущена(Pending), запущена(Running), завершена(Done) и отменена(Cancelled). На работе с этими состояниями и завязана особенность в работе с задачами, заключается она в том, что из одного состояния забрать результат мы можем, а из другого нет и манипулирование этим пониманием предоставляет возможность низкоуровневого сильно контролируемого взаимодействия с задачами.
Давайте добавим парочку долгих задач.
... async def main(): """Функция, в которой проверяем важен ли порядок вывода корутин""" # Работа двух мгновенных корутин await simple_msg('Сообщение 1') await count(10) # Работа корутин с задержкой long_task_one = asyncio.create_task(long_operation('Задача 1')) long_task_two = asyncio.create_task(long_operation('Задача 2')) # Работа двух мгновенных корутин await simple_msg('Сообщение 2') await count(5) # Метод .gather() для нескольких задач await asyncio.gather(long_task_one, long_task_two) if __name__ == "__main__": # event_loop asyncio.run(main())
Сообщение 1 2 Сообщение 2 2 Старт задачи Задача 1 Старт задачи Задача 2 Конец задачи Задача 1 Конец задачи Задача 2 Process finished with exit code 0
Создадим две задачи долгоисполняемых корутин и обернем их в метод .gather(), который принимает в себя любые awaitable объект. Но особенность метода .gather() в том, что он исполняет переданные в него awaitable объекты в том порядке, в котором они перечислены. То есть мы можем все наши корутины обернуть в задачи и в том же порядке передать в .gather().
... async def main(): """Функция, в которой проверяем важен ли порядок вывода корутин""" # Работа двух мгновенных корутин short_task_11 = asyncio.create_task(simple_msg('Сообщение 1')) short_task_12 = asyncio.create_task(count(10)) # Работа корутин с задержкой long_task_one = asyncio.create_task(long_operation('Задача 1')) long_task_two = asyncio.create_task(long_operation('Задача 2')) # Работа двух мгновенных корутин short_task_21 = asyncio.create_task(simple_msg('Сообщение 2')) short_task_22 = asyncio.create_task(count(5)) # Метод .gather() для нескольких задач await asyncio.gather( short_task_11, short_task_12, short_task_21, short_task_22, long_task_one, long_task_two, ) if __name__ == "__main__": # event_loop asyncio.run(main())
Сообщение 1 2 Сообщение 2 2 Старт задачи Задача 1 Старт задачи Задача 2 Конец задачи Задача 1 Конец задачи Задача 2 Process finished with exit code 0
Тоже самое, что и без использования .gather(). Но что если мы не хотим следить за этим порядком, допустим мы не знаем, что будет исполнено быстрее, а задача заключается, например, в получении ответа сразу как это становится возможно. И для такой задачи есть отдельный метод, который называется .as_completed().
... async def main(): """Функция, в которой проверяем важен ли порядок вывода корутин""" # Работа двух мгновенных корутин short_task_11 = asyncio.create_task(simple_msg('Сообщение 1')) short_task_12 = asyncio.create_task(count(10)) # Работа корутин с задержкой long_task_one = asyncio.create_task(long_operation('Задача 1')) long_task_two = asyncio.create_task(long_operation('Задача 2')) # Работа двух мгновенных корутин short_task_21 = asyncio.create_task(simple_msg('Сообщение 2')) short_task_22 = asyncio.create_task(count(5)) # Метод .as_completed() для нескольких задач for task in asyncio.as_completed(( short_task_11, short_task_12, short_task_21, short_task_22, long_task_one, long_task_two) ): await task if __name__ == "__main__": # event_loop asyncio.run(main())
Сообщение 1 2 Старт задачи Задача 1 Старт задачи Задача 2 Сообщение 2 2 Конец задачи Задача 1 Конец задачи Задача 2 Process finished with exit code 0
Метод .as_completed() будет возвращать результат сразу, когда это становится возможным, таким образом нам не нужно следить за порядком передачи задач в этот метод. Очень удобно и компактно, просто в цикле перебираем итератор из awaitable объектов и возвращаем результат, когда это возможно.
И осталась еще одна возможность вывода результата, выше я упомянул о четырех состояниях, и этот метод возвращает нам множества awaitable объектов в состояниях done и pending.
... async def main(): """Функция, в которой проверяем важен ли порядок вывода корутин""" # Работа двух мгновенных корутин short_task_11 = asyncio.create_task(simple_msg('Сообщение 1')) short_task_12 = asyncio.create_task(count(10)) # Работа корутин с задержкой long_task_one = asyncio.create_task(long_operation('Задача 1')) long_task_two = asyncio.create_task(long_operation('Задача 2')) # Работа двух мгновенных корутин short_task_21 = asyncio.create_task(simple_msg('Сообщение 2')) short_task_22 = asyncio.create_task(count(5)) # Метод .wait() для нескольких задач done, pending = await asyncio.wait(( short_task_11, short_task_12, short_task_21, short_task_22, long_task_one, long_task_two) ) print(done) print(pending) if __name__ == "__main__": # event_loop asyncio.run(main())
Сообщение 1 2 Старт задачи Задача 1 Старт задачи Задача 2 Сообщение 2 2 Конец задачи Задача 1 Конец задачи Задача 2 {<Task finished name='Task-3' coro=<count() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:9> result=None>, <Task finished name='Task-6' coro=<simple_msg() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:4> result=None>, <Task finished name='Task-2' coro=<simple_msg() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:4> result=None>, <Task finished name='Task-4' coro=<long_operation() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:14> result=None>, <Task finished name='Task-7' coro=<count() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:9> result=None>, <Task finished name='Task-5' coro=<long_operation() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:14> result=None>} set() Process finished with exit code 0
Метод .wait(). Как упомянулось выше возвращает два множества, в данном случае все задачи выполнены, поэтому множество pending пустое.
И последнее, состояния задачи мы можем явно просматривать, и явно на него влиять, для этого существуют отдельные методы.
... async def main(): """Функция, в которой проверяем важен ли порядок вывода корутин""" # Работа двух мгновенных корутин short_task_11 = asyncio.create_task(simple_msg('Сообщение 1')) short_task_12 = asyncio.create_task(count(10)) # Работа корутин с задержкой long_task_one = asyncio.create_task(long_operation('Задача 1')) long_task_two = asyncio.create_task(long_operation('Задача 2')) # Работа двух мгновенных корутин short_task_21 = asyncio.create_task(simple_msg('Сообщение 2')) short_task_22 = asyncio.create_task(count(5)) # Методы для работы с каждой конкретной задачей print(f'Результат метода done() - {short_task_22.done()}') print(f'Результат метода cancelled() - {short_task_22.cancelled()}') print(f'Результат метода cancel() - {short_task_22.cancel("Принудительно остановил")}') # Метод .wait() для нескольких задач done, pending = await asyncio.wait(( short_task_11, short_task_12, short_task_21, short_task_22, long_task_one, long_task_two) ) print('\n', done, '\n', sep='') # Проверка состояний после работы метода .wait() print(f'Повторный результат метода done() - {short_task_22.done()}') print(f'Повторный результат метода cancelled() - {short_task_22.cancelled()}') if __name__ == "__main__": # event_loop asyncio.run(main())
Результат метода done() - False Результат метода cancelled() - False Результат метода cancel() - True Сообщение 1 2 Старт задачи Задача 1 Старт задачи Задача 2 Сообщение 2 Конец задачи Задача 1 Конец задачи Задача 2 {<Task finished name='Task-3' coro=<count() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:9> result=None>, <Task finished name='Task-6' coro=<simple_msg() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:4> result=None>, <Task finished name='Task-2' coro=<simple_msg() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:4> result=None>, <Task finished name='Task-4' coro=<long_operation() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:14> result=None>, <Task cancelled name='Task-7' coro=<count() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:9>>, <Task finished name='Task-5' coro=<long_operation() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:14> result=None>} Повторный результат метода done() - True Повторный результат метода cancelled() - True Process finished with exit code 0
В качестве примера используем задачу short_task_22 и применим к ней некоторые методы.
Метод .done() - возвращает True, если задача в состоянии Done.
Метод .cancelled() соответственно возвращает True, если задача была отменена.
Метод .cancel() принудительно переводит задачу в состояние cancelled.
Как видим состояние cancelled после метода .wait() изменилось, а done осталось прежним. Если посмотреть множество done, то мы увидим там Task-7, которая в отличие от остальных задач начинается со строки Task cancelled...
И по скольку состояние cancelled принимается задачей после done, то и при втором применении .done() к задаче мы видим True. Но на что тогда влияет отмена?
Из отмененной задачи мы не сможем забрать результат.
... print(f'Повторный результат метода cancelled() - {short_task_22.cancelled()}') # Метод .result() возвращает исключение CancelledError, # если ранее к объекту применялся метод .cancel() try: short_task_22.result() except asyncio.CancelledError: print('Исключение CancelledError') ...
Результат метода done() - False Результат метода cancelled() - False Результат метода cancel() - True Сообщение 1 2 Старт задачи Задача 1 Старт задачи Задача 2 Сообщение 2 Конец задачи Задача 1 Конец задачи Задача 2 {<Task finished name='Task-3' coro=<count() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:9> result=None>, <Task finished name='Task-6' coro=<simple_msg() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:4> result=None>, <Task finished name='Task-2' coro=<simple_msg() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:4> result=None>, <Task finished name='Task-4' coro=<long_operation() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:14> result=None>, <Task cancelled name='Task-7' coro=<count() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:9>>, <Task finished name='Task-5' coro=<long_operation() done, defined at /home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/asincyio_task_ex_test.py:14> result=None>} Повторный результат метода done() - True Повторный результат метода cancelled() - True Исключение CancelledError Process finished with exit code 0
К задачам в состоянии Done мы можем применить метод .result() и посмотреть результат, но к задачам в состоянии cancelled, при применении метода .result() мы получим исключение CancelledError.
В блоке try except мы можем обработать это исключение.
Можно было бы обсудить еще api обработчика событий event_loop, но там не так много возможностей, скорее всего использование метода .run() будет достаточно.
Таким образом мы разобрались с задачами, состояниями, возможностями обработки результатов awaitable объектов, мы познакомились не со всем доступным api, с остальными методами вы всегда можете самостоятельно ознакомиться в документации. Этот материал введение в асинхронность и надеюсь он сформировал представление о возможностях этой концепции и конкретно о возможностях библиотеки asincio.
Теперь давайте подробнее поговорим о такой вещи как потоки и многопоточность.
Многопоточное программирование
GIL
В введении этого материала была такая фраза
"...только вот мы тут говорим в первую очередь о python и python не поддерживает параллельную работу потоков..."
Это абсолютная правда, когда мы говорим о python про параллельную работу потоков внутри одного процесса мы можем забыть, по крайней мере, на начало осени 2022. Работа потоков в python выполняется только последовательно и все из-за такого механизма как GIL или Global Interpreter Lock.
GIL это некая, пусть будет, переменная, которая вводит правило - 'В любой момент времени может работать только один поток и разрешение на работу этому потоку выдаю я'.
Таким образом по настоящему параллельная работа поток не возможна.
Разговоры об удалении GIL из python ведутся, насколько я вычитал с начал двухтысячных, а возможно и еще раньше. Создатель Python, Guido van Rossum в 2007 сказал в какой-то статье, что GIL будет удален из python тогда, когда производительность однопоточных приложений будет настолько же высока без GIL как с ним. Прошло 15 лет, а GIL все еще существует, значит плюсов от него больше чем минусов. Говоря поверхностно, минус - нет параллельно работающих потоков, плюс - однопточные приложения производительнее и мы имеем меньшую вероятность столкнуться с потокозависимыми проблемами в однопоточных приложениях. Это все на самом деле нам не столь важно, главное во всем этом:
1. факт наличия такой вещи как GIL
2. понимание какие ограничения этот GIL на нас накладывает.
И еще на что хотелось бы обратить внимание в этом вступлении - работа с потоками интересна нам в рамках CPU-bound операций, поэтому GIL не сильно влияет на работу I/O-bound операций.
Foreground. Background
То, что мы не можем работать с потоками параллельно, совсем не означает, что работа с потоками для нас бесполезна, напротив, на работе с потоками завязано очень многое в современных программах. И первое о чем следует упомянуть это, конечно, как создавать поток и какие типы потоков вообще существует. Эти вопросы взаимосвязаны и для демонстрации предлагаю сразу перейти к примерам.
import time def long_def(n, x): """Программа, которая выполняется не мгновенно""" start = time.time() lst = [] while n <= x: lst.append(n) n += 1 end = time.time() print(f'Выполнено за {end - start}') return sum(lst) if __name__ == '__main__': # Точка входа в программу, по умолчанию главный поток print('Запуск потока main()') print(long_def(1, 30000000)) print('Завершение потока main()')
Запуск потока main() Выполнено за 2.3732783794403076 450000015000000 Завершение потока main() Process finished with exit code 0
Для демонстрации напишем любую программу, которая исполняется не мгновенно, а хотя бы пару секунд, этого будет достаточно.
Итак, перед нами классическая однопоточная программа, программа всегда запускается в потоке и это нам не нужно объявлять явно. В данном примере поток запускается при запуске программы, то есть внутри конструкции if __name__ == '__main__'. И программа будет работать до тех пор, пока все задачи внутри этого потока не будут выполнены. Конструкции if __name__ == '__main__' в данном случае главный или Foreground поток. Запомните, программа не будет завершена до тех пор, пока активен Foreground поток.
И Foreground поток это тип, который создается по умолчанию при создании потока.
Какая проблема существует у этой программы? Мы не видим строчку 'Завершение потока main()' пока не будет выполнена относительно долгая программа до вызова этой строки.
Давайте уберем выполнение этой долгой программы в отдельный поток и запустим его.
import time import threading def long_def(n, x): """Программа, которая выполняется не мгновенно""" start = time.time() lst = [] while n <= x: lst.append(n) n += 1 end = time.time() print(f'Выполнено за {end - start}\nЗначение - {sum(lst)}') if __name__ == '__main__': # Точка входа в программу, по умолчанию главный поток print('Запуск потока main()') # Создаем поток, по умолчанию создан Foreground поток и теперь # Это главный поток, или вернее сказать тоже thread = threading.Thread(target=long_def, args=(1, 30000000)) # Запускаем поток thread.start() print('Завершение потока main()')
Запуск потока main() Завершение потока main() Выполнено за 2.7335126399993896 Значение - 450000015000000 Process finished with exit code 0
Для создания потоков используется библиотека из стандартного набора - threading. Для создания потока используется threading.Thread().
Так создадим поток thread, в аргумент target= передается функция, которую мы хотим вынести в отдельный поток, в параметр args= передаются аргументы этой функции.
Для запуска потока используется метод .start().
Поток с именем thread - Foreground поток, это означает, что пока он не будет выполнен программа не закончится, как мы видим, именно так и происходит. Но теперь у нас действительно работают два отдельных потока, и какой тип тогда у потока main()? Такой же, как и был - Foreground. Мы можем иметь в одной программе несколько Foreground потоков и пока хотя бы один из них работает программа также работает.
В данном примере это не очевидно, поток main() выполняется значительно быстрее, чем поток thread. Пусть будет наоборот.
import time import threading def long_def(n, x): """Программа, которая выполняется не мгновенно""" start = time.time() lst = [] while n <= x: lst.append(n) n += 1 end = time.time() print(f'Выполнено за {end - start}\nЗначение - {sum(lst)}') if __name__ == '__main__': # Точка входа в программу, по умолчанию главный поток print('Запуск потока main()') # Создаем поток, по умолчанию создан Foreground поток и теперь # Это главный поток, или вернее сказать тоже thread = threading.Thread(target=long_def, args=(1, 30000000)) # Запускаем поток thread.start() time.sleep(5) print('Завершение потока main()')
Запуск потока main() Выполнено за 2.6352386474609375 Значение - 450000015000000 Завершение потока main() Process finished with exit code 0
Для этого добавим всего одну строку - time.sleep(5). Обратите внимание, эта пауза относится именно к потоку main(), на работу потока thread она никак не влияет.
Как мы видим, несмотря на то, что thread выполняется теперь быстрее, пока main() не завершится программа работает, это доказывает, что оба потока имеют тип Foreground.
Второй тип потока - Background. Отличается он от Foreground, как вы наверное уже догадались тем, что если к моменту завершения последнего Foreground потока будут незавершенные Background потоки, программа все равно завершиться. Давайте убедимся в этом.
import time import threading def long_def(n, x): """Программа, которая выполняется не мгновенно""" start = time.time() lst = [] while n <= x: lst.append(n) n += 1 end = time.time() print(f'Выполнено за {end - start}\nЗначение - {sum(lst)}') if __name__ == '__main__': # Точка входа в программу, по умолчанию главный поток print('Запуск потока main()') # Создаем поток и явно укажем, что это Background поток (парамтер deamon) thread = threading.Thread(target=long_def, args=(1, 30000000), daemon=True) # Запускаем поток thread.start() print('Завершение потока main()')
Запуск потока main() Завершение потока main() Process finished with exit code 0
Для создания потока типа Background используется параметр daemon в значении True. Теперь исполнение потока thread даже не успевает начаться, потому что вывод двух print() потока main() происходит быстрее и как только последняя команда Foreground потока завершена, завершается работа всей программы и наличие незавершенных Background потоков ни на что не влияют.
А если мы все таки хотим, чтобы программа не завершалась и дождалась выполнения не всех, а какого-нибудь явно указанного Background потока. Вдруг задача этого потока не настолько важная, чтобы делать этот поток Foreground потоком, но в каком-то конкретном случае нам все таки необходимо дождаться ее исполнения.
import time import threading def long_def(n, x): """Программа, которая выполняется не мгновенно""" start = time.time() lst = [] while n <= x: lst.append(n) n += 1 end = time.time() print(f'Выполнено за {end - start}\nЗначение - {sum(lst)}') if __name__ == '__main__': # Точка входа в программу, по умолчанию главный поток print('Запуск потока main()') # Создаем поток, по умолчанию создан Foreground поток и теперь # Это главный поток, или вернее сказать тоже thread = threading.Thread(target=long_def, args=(1, 30000000), daemon=True) # Запускаем поток thread.start() # Говорим, что пока этот поток работает программа не должна завершаться thread.join() print('Завершение потока main()') # thread.join() # В таком случае сначала мы увидим строку # 'Завершение потока main()' # И после нее начнется выполнение потока thread
Запуск потока main() Выполнено за 2.387380599975586 Значение - 450000015000000 Завершение потока main() Process finished with exit code 0
Для этой цели используется метод .join(). Теперь даже несмотря на то, что Foreground поток завершился программа дожидается окончания Background потока, к которому был применен метод .join(). При чем вызвать метод .join() можно и после последней команды Foreground потока.
threading и скорость работы
Важно понимать, использование нескольких потоков внутри одного процесса только замедляет работу программу, потому что факт последовательного выполнения, при запуске нескольких потоков, остается, но при этом появляется переключение между этими потоками (context switch), которое также занимает время.
Поэтому потоки мы используем только для контроля над порядком исполнения блокирующих функций, но точно не для скорости работы всей программы.
Убедиться, что 'параллельные' потоки работают медленнее последовательных очень просто. Напишем для этого следующую программу.
import threading import random import time from functools import wraps def how_long(func): """Декоратор для измерения времени""" @wraps(func) def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print(f'Выполнено за {end - start} секунд') return res return wrapper def write_in_file(n, filename): """Функция для записи в файл 3000000 трехзначных чисел""" print('start write') with open(filename, 'w') as file: while n <= 3000000: file.write(str(random.randint(1, 1000))) n += 1 print('end write') @how_long def example_with_threads(): """Запись 3000000 чисел в два файла, используя "параллельные" потоки""" print('Старт потока example_with_threads()') thread1 = threading.Thread(target=write_in_file, args=(1, 'example1.txt'), daemon=True) thread2 = threading.Thread(target=write_in_file, args=(1, 'example2.txt'), daemon=True) thread1.start() thread2.start() thread1.join() thread2.join() print('Конец потока example_with_threads()', '\n', sep='') @how_long def example_without_threads(): """Запись 3000000 чисел в два файла, используя последовательные потоки""" print('Старт потока example_without_threads()') write_in_file(1, 'example3.txt') write_in_file(1, 'example4.txt') print('Конец потока example_without_threads()') if __name__ == '__main__': # Запуск Foreground потока print('Старт потока main()') # Запуск программы, которая использует "Параллельные" потоки example_with_threads() # Запуск программы, которая использует Последовательные потоки example_without_threads() print('Конец потока main()')
Старт потока main() Старт потока example_with_threads() start write start write end write end write Конец потока example_with_threads() Выполнено за 7.963376045227051 секунд Старт потока example_without_threads() start write end write start write end write Конец потока example_without_threads() Выполнено за 6.39645528793335 секунд Конец потока main() Process finished with exit code 0
Напишем функцию write_in_file(), в которой будем записывать в файл три миллиона трехзначных чисел. Напишем классический декоратор для замера времени how_long().
Будем использовать запись в 4 файла, в два из них будем записывать числа с помощью условно параллельных потоков, а в оставшиеся два с помощью классической последовательной работы потоков.
Из примера прекрасно видно, что последовательные потоки работают быстрее, при чем разница во времени будет увеличиваться с увеличением числа потоков, поскольку context switch будет происходить больше раз.
И есть еще один момент, который мы сразу тут же можем обсудить. Обратите внимание на запуск потоков в example_with_threads(), два .start(), два .join(), а это всего два потока, допустим их будем 5, сколько лишних одинаковых строк придется писать. Это захламление настолько бросается в глаза, что кажется для избавления от него точно должно быть какое-то решение, как в случае с asincio и множественным использованием await. И такое решение действительно есть.
import concurrent.futures import random import time from functools import wraps def how_long(func): """Декоратор для измерения времени""" @wraps(func) def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print(f'Выполнено за {end - start} секунд') return res return wrapper def write_in_file(n, filename): """Функция для записи в файл 3000000 трехзначных чисел""" print('start write') with open(filename, 'w') as file: while n <= 3000000: file.write(str(random.randint(1, 1000)) + '\n') n += 1 print('end write') @how_long def example_with_threads(): """Запись 3000000 чисел в два файла, используя "параллельные" потоки""" print('Старт потока example_with_threads()') with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: executor.submit(write_in_file, 1, 'example1.txt') executor.submit(write_in_file, 1, 'example2.txt') print('Конец потока example_with_threads()') @how_long def example_without_threads(): """Запись 3000000 чисел в два файла, используя последовательные потоки""" print('Старт потока example_without_threads()') write_in_file(1, 'example3.txt') write_in_file(1, 'example4.txt') print('Конец потока example_without_threads()') if __name__ == '__main__': # Запуск Foreground потока print('Старт потока main()') # Запуск программы, которая использует "Параллельные" потоки example_with_threads() # Запуск программы, которая использует Последовательные потоки example_without_threads() print('Конец потока main()')
Старт потока main() Старт потока example_with_threads() start write start write end write end write Конец потока example_with_threads() Выполнено за 6.220174312591553 секунд Старт потока example_without_threads() start write end write start write end write Конец потока example_without_threads() Выполнено за 5.383180141448975 секунд Конец потока main() Process finished with exit code 0
Решение это называется - ThreadPoolExecutor() из concurrent.futures. В качестве аргумента передаем max_workers, это как раз количество потоков. При чем это количество необязательно должно быть равным количеству выполняемых задач, если потоков будет больше чем задач, то лишние потоки просто не будут задействованы, а если меньше, допустим мы выделили также два потока, а задач у нас три, тогда сначала выполнятся две задачи, а после того как один из потоков освободится - выполнится третья задача.
Параметр max_workers можно не указывать явно, тогда это число будет равно количеству ядер устройства + 4, но не более 32.
Открываем мы всю эту конструкцию в менеджере контекста, поскольку созданный таким образом объект закрывается функцией shutdown(). И чтобы не забыть закрыть этот объект мы используем with. Не использовать with нам хочется только в том случае, когда мы хотим в shutdown() передать параметр wait в значении False, по умолчанию True, по назначению тоже самое, что .join().
Методом .submit() мы запускаем задачу в этой созданной многопоточной среде.
Как видно из результата на время это никак не влияет, оно все равно выше, но использование ThreadPoolExecutor() добавляет компактности нашему коду.
Timer
Итак, мы разобрались, что целью использования потоков не является ускорение работы программы, целью является скорее своего рода всевозможная синхронизация этих самых потоков, запуск разных задач внутри разных потоков и контроль над порядком и взаимосвязями их выполнения.
Синхронизация потоков и контроль над ними представлены несколькими классами, на каждый из них мы совсем скоро посмотрим. Правда перед этим давайте взглянем на один класс, который представляет собой скорее отдельную группу и не связан напрямую с синхронизацией, но когда речь идет о работе с потоками, как будто, именно такой класс напрашивается как пример самой простой реализации задач, которые берут на себя потоки.
Класс Timer(). Не станем долго на нем задерживаться, одного примера будет достаточно.
import threading def gift_prepare(): print(f'Поздравляем! Для вас подарок!\n' f'Он будет готов через 30 секунд...\n') def gift_ready(): email = input(f'Подарок готов, для его получения введите ваш email\n' f'email - ') if email: print('Спасибо! Подарок уже у вас на почте') else: print('Извините, необходимо ввести email') if __name__ == '__main__': gift_prepare() send_gift_thread = threading.Timer(interval=30, function=gift_ready) send_gift_thread.start()
Поздравляем! Для вас подарок!
Он будет готов через 30 секунд...
Подарок готов, для его получения введите ваш email
email - test@test.ru
Спасибо! Подарок уже у вас на почте
Process finished with exit code 0
Особенность класса Timer() в параметре interval, внутри него в секундах записывается время, через которое должно начаться выполнение, переданной в аргумент function, функции.
Например, нам нужно как в примере сымитировать время ожидания какого-то действия, с помощью класса Timer() реализовать такое поведение можно очень просто.
Класс Timer() имеет метод .cancel(), с помощью него можно принудительно прервать функцию запущенную через Timer(), но только при том условии, что выполнение функции еще не началось, то есть время внутри interval еще не вышло.
Event
Теперь можно перейти к теме синхронизации потоков. Потоки исполняют какие-то функции, а эти функции в свою очередь используют какие-то данные и бывает, что функциям из разных потоков может потребоваться доступ к этим данных и такое поведение может повлечь за собой ряд проблем, о которых мы поговорим позже, но факт тут в том, что для корректной работы такого поведения нам нужны инструменты для взаимодействия между потоками. Инструменты синхронизации потоков.
Первый класс, который мы рассмотрим - Event()(событие). С помощью событий мы можем оповещать выполнены какие-нибудь условия внутри потока или нет.
Создадим простой пример.
import threading # Создаем экземпляр Event() event = threading.Event() def gifts_prepare(): # Изначальное состояние экземпляра Event() False, # пока состояние False все, что ниже .wait() не начнется event.wait() print(f"\nПодарок для {threading.current_thread().name} отправлен") if __name__ == '__main__': # Главный поток, в котором имитируется работа print(f'Отправка подарков 5 пользователям') # Цикл для создания и запуска 5 потоков for user in range(5): threading.Thread(target=gifts_prepare, name=f'Пользователь {user}').start() print(f'\nПодарок для пользователя {user} готов') # Только когда существует 5 активных потоков флаг экземпляра Event() становится True # И инструкция после .wait() могут быть выполнены if threading.active_count() >= 5: event.set()
Отправка подарков 5 пользователям Подарок для пользователя 0 готов Подарок для пользователя 1 готов Подарок для пользователя 2 готов Подарок для пользователя 3 готов Подарок для пользователя 4 готов Подарок для Пользователь 0 отправлен Подарок для Пользователь 1 отправлен Подарок для Пользователь 4 отправлен Подарок для Пользователь 3 отправлен Подарок для Пользователь 2 отправлен Process finished with exit code 0
Для работы с классом Event() создадим его экземпляр.
Особенность Event() заключается в флаге этого события, который может быть только в состоянии True либо False. Когда мы создаем экземпляр класса Event() этот флаг устанавливается в состояние False. Этот флаг нам нужен для манипулирования выполнением задач с помощью метода .wait(), у .wait() есть только одно правило - 'пока экземпляр класса Event() находится в состоянии False все, что находится ниже .wait() выполнено не будет, пока состояние не изменится на True.'.
Изменять состояние можно методом .set(), а для проверки состояния существует метод .is_set(), который вернет True, если состояние экземпляра True.
А если нам нужно сбросить состояние обратно в False, то используем метод .clear().
В примере создаем программу, которая имитирует отправку подарков, например, на email, и допустим мы хотим сначала все эти подарки сформировать и только, когда все они будут готовы отправить их. С помощью Event() как раз можно это реализовать, мы создаем 5 потоков, каждый из которых берет на исполнение функцию gifts_prepare(), но сразу при заходе в эту функцию поток сталкивается с .wait(), проверяет его состояние и получает False, потому что изначально Event() создается в этом состоянии. Ниже мы делаем проверку активных потоков и как только их количество достигает 5, то .wait() пропускает наши потоки дальше и они успешно исполняются.
Если мы хотим ограничить ожидание временем, то можем использовать параметр timeout (по умолчанию None) метода .wait(), в него можно передать время, через которое блокировка спадет автоматически.
import threading import time # Создаем экземпляр Event() event = threading.Event() def gifts_prepare(): # Изначальное состояние экземпляра Event() False, # пока состояние False все что ниже .wait() не начнется event.wait(timeout=3) print(f"\nПодарок для {threading.current_thread().name} отправлен") if __name__ == '__main__': # Главный поток, в котором имитируется работа print(f'Отправка подарков 5 пользователям') # Цикл для создания и запуска 5 потоков for user in range(5): threading.Thread(target=gifts_prepare, name=f'Пользователь {user}').start() print(f'\nПодарок для пользователя {user} готов') time.sleep(1) # Только когда существует 5 активных потоков флаг экземпляра Event() становится True # И инструкция после .wait() могут быть выполнены if threading.active_count() >= 5: event.set()
Отправка подарков 5 пользователям Подарок для пользователя 0 готов Подарок для пользователя 1 готов Подарок для пользователя 2 готов Подарок для Пользователь 0 отправлен Подарок для пользователя 3 готов Подарок для Пользователь 1 отправлен Подарок для пользователя 4 готов Подарок для Пользователь 2 отправлен Подарок для Пользователь 3 отправлен Подарок для Пользователь 4 отправлен Process finished with exit code 0
Например, давайте добавим задержку после формирования каждого подарка, а для .wait() установим время в 3 секунды, таким образом формирование 5 потоков и естественного перевода события в состояние True будет происходить дольше, чем пройдет вручную заданный таймер.
И на примере мы действительно видим, что подарки стали отправляться после третьего, а не после пятого.
Semaphore
Еще один класс для управления потоков - Semaphore(). Особенность в том, что мы можем задать количество мест для потоков и пока эти места заняты остальные потоки вынуждены ждать пока место освободится.
import threading import time class GiftsSend: """В классе имитируем отправку подарков для победителей, но создаем условие, что одновременно можно формировать только 2 подарка, и только после того как они сформируются можно начинать формировать следующие""" def __init__(self): # Создаем экземпляр класса Semaphore() self.moment_count = threading.Semaphore(value=2) def gift_send(self, winner): # Приводим поток в работу, тем самым занимаем слот в Semaphore() print(f'\nОповещаем победителя {winner}') self.moment_count.acquire() # Имитируем затрату времени на работу print(f'\nФормируем подарок для {winner}') time.sleep(2) print('\nМеста на отправку заняты') # Отправляем подарок, тем самым освобождая слот в Semaphore() print(f'\nОтправляем подарок для {winner}') self.moment_count.release() def gift_winners(self, count): """Формируем количество потоков по числу переданных победителей""" for winner in range(count): threading.Thread(target=self.gift_send, args=[winner]).start() if __name__ == '__main__': FirstSend = GiftsSend() FirstSend.gift_winners(4)
Оповещаем победителя 0 Формируем подарок для 0 Оповещаем победителя 1 Формируем подарок для 1 Оповещаем победителя 2 Оповещаем победителя 3 Места на отправку заняты Места на отправку заняты Отправляем подарок для 1 Отправляем подарок для 0 Формируем подарок для 2 Формируем подарок для 3 Места на отправку заняты Отправляем подарок для 2 Места на отправку заняты Отправляем подарок для 3 Process finished with exit code 0
Экземпляр класса Semaphore() имеет параметр value, по умолчанию этот параметр равен 1, это как раз то самое допустимое количество мест для потоков.
Если мы хотим указать, что место занято используем метод acquire(), а для того, чтобы освободить место используем метод release(). В release() можно передать параметр n, в котором указывается количество высвобождаемых потоков. А для acquire() можно задать параметр timeout, в котором явно указывается время, через которое место освободится.
Таким образом в примере мы формируем условие, что одновременно могут формироваться только два подарка, таким образом все остальные победители попадают в начало функции gift_send(), мы видим вывод фразы 'Оповещаем победителя {winner}' и ждем, когда освободится место для формирования подарка, а освободится оно тогда, когда сформированный подарок будет отправлен.
У Semaphore() есть дочерний класс - BoundedSemaphore(). Разница у них следующая. Как упоминалось выше у .release() есть параметр n, куда мы можем передать, например, значение 2, таким образом даже в нашем примере после нескольких итераций количество свободных мест станет выше, чем значение value. И как раз если мы хотим избежать такого поведения, то вместо Semaphore() следует использовать BoundedSemaphore(), который автоматически формирует ValueError, когда мест больше чем в параметре value.
import threading import time class GiftsSend: """В классе имитируем отправку подарков для победителей, но создаем условие, что одновременно можно формировать только 2 подарка, и только после того как они сформируются можно начинать формировать следующие""" def __init__(self): # Создаем экземпляр класса Semaphore() self.moment_count = threading.BoundedSemaphore(value=2) def gift_send(self, winner): # Приводим поток в работу, тем самым занимаем слот в Semaphore() print(f'\nОповещаем победителя {winner}') self.moment_count.acquire() # Имитируем затрату времени на работу print(f'\nФормируем подарок для {winner}') time.sleep(2) print('\nМеста на отправку заняты') # Отправляем подарок, тем самым освобождая слот в Semaphore() print(f'\nОтправляем подарок для {winner}') try: self.moment_count.release(2) except ValueError: print(f'Количество release больше value') def gift_winners(self, count): """Формируем количество потоков по числу переданных победителей""" for winner in range(count): threading.Thread(target=self.gift_send, args=[winner]).start() if __name__ == '__main__': FirstSend = GiftsSend() FirstSend.gift_winners(4)
Оповещаем победителя 0 Формируем подарок для 0 Оповещаем победителя 1 Формируем подарок для 1 Оповещаем победителя 2 Оповещаем победителя 3 Места на отправку заняты Отправляем подарок для 0 Места на отправку заняты Отправляем подарок для 1 Количество release больше value Формируем подарок для 3 Формируем подарок для 2 Места на отправку заняты Отправляем подарок для 3 Места на отправку заняты Отправляем подарок для 2 Количество release больше value Process finished with exit code 0
Так мы видим, что в ходе выполнения программы с параметром 2 в методе .release(), значение value было превышено дважды.
Barrier
Следующий класс для синхронизации потоков - Barrier(). Его особенность полностью отражена в названии, идея заключается в том, что мы устанавливаем некий барьер, и пока все потоки не подойдут к этому барьеру исполнение инструкций после барьера не начнется.
import threading import time import random import datetime class GiftsSend: """В классе имитируем отправку подарков для победителей, но создаем условие, что сначала мы собираем нужное количество подарков для всех победителей и только после этого отправляем их""" def __init__(self, count_winners): # Передаем число победителей self.count_winners = count_winners # Создаем экземпляр класса Barrier() self.barrier = threading.Barrier(self.count_winners) def gift_send(self, winner): """Установим разное время на сбор подарка для каждого победителя Таким образом пока все подарки собраны не будут их отправка не случится""" print(f'Собираем подарок для {winner}') # Имитируем разное время на подготовку каждого подарка time.sleep(random.randint(2, 6)) print(f'Подарок для {winner} собран в {datetime.datetime.now()}') # Пока все потоки не упрутся в .wait() выполнение функции не продолжится self.barrier.wait() print(f'\nОтправляем подарок для {winner}, в {datetime.datetime.now()}') if __name__ == '__main__': # Экземпляр с количеством победителей FirstSend = GiftsSend(3) # Создаем поток для каждого победителя и запускаем for winner in range(FirstSend.count_winners): threading.Thread(target=FirstSend.gift_send, args=str(winner)).start()
Собираем подарок для 0 Собираем подарок для 1 Собираем подарок для 2 Подарок для 0 собран в 2022-09-11 19:04:39.974711 Подарок для 1 собран в 2022-09-11 19:04:41.973753 Подарок для 2 собран в 2022-09-11 19:04:43.977774 Отправляем подарок для 2, в 2022-09-11 19:04:43.977928 Отправляем подарок для 0, в 2022-09-11 19:04:43.977998 Отправляем подарок для 1, в 2022-09-11 19:04:43.978059 Process finished with exit code 0
В качестве обязательно параметра Barrier() принимает количество потоков.
Метод .wait() ставит точку, в которую должны упереться все потоки, перед тем как продолжить исполнение дальнейших инструкций.
Если необходимо вручную поставить поток в нерабочее состояние используется метод .abort(). Вызовы метода .wait() при этом завершаться ошибкой threading.BrokenBarrierError.
Для возвращения барьера в пустое состояние используется метод .reset(), его использование будет означать, что все ожидающие исполнения через .wait() потоки будут принудительно остановлены с вызовом ошибки threading.BrokenBarrierError.
Для просмотра состояния барьера существуют атрибуты
.parties - возвращает количество потоков, ожидающих прохождение барьера
.n_waiting - возвращает количество потоков еще не переданных в исполнение барьера.
.broken - True, если барьер в нерабочем состоянии.
В примере мы снова имитируем отправку подарков, но в данном случае мы хотим отослать их всем победителям только после того, как все подарки будут собраны. Используем .randint() для имитации разного времени сбора подарка, таким образом получается, что даже если первый подарок собран за 2 секунды он все равно вынужден ждать пока будут собраны все подарки. В выводе мы видим, что собраны подарки в разное время, а отправлены все вместе сразу же как только был собран последний подарок.
Lock. RLock
Чаще всего для синхронизации потоков вы будете пользоваться не классами рассмотренными выше, хотя и знать об их возможностях, конечно, лишним не будет, а классами - Lock и RLock. Когда речь идет о работе с несколькими потоками, которые имеют доступ к одной переменной и как-то с ней взаимодействуют, возникает вероятность возникновения абсолютно неожиданных результатов, только потому, что context switch может давать разным потокам влиять на эту переменную еще до того, как какой-нибудь из, ранее взявшихся за эту переменную, потоков еще не закончил свои манипуляции. Отсюда возникает необходимость в блокировке действия над переменной, пока над ней работает какой-нибудь из потоков, и только когда допущенный к переменной поток доделает все свои дела другой поток сможет приступить к своим делам над этой переменной.
Посмотрим на пример
import random import string import threading import time class ListOfUsers: """Класс для хранения списка пользователей""" def __init__(self): # Устанавливаем список начальных пользователей self.list_of_names = ['vova', 'jora'] def new_name(self, user_name: str): """Функция для добавления нового пользователя""" print(f'Новый пользователь - {user_name}') # Имитируем задержку добавления для провоцирования context switch time.sleep(2) self.list_of_names.append(user_name) print(f'Пользователь {user_name} добавлен') print(f'Список пользователей после - {lock_example.list_of_names}') if __name__ == '__main__': # Создаем экземпляр и показываем начальный список lock_example = ListOfUsers() print(f'Список пользователей в начале - {lock_example.list_of_names} ') # Забираем в переменную alphabets все буквы alphabets = string.ascii_letters # Три раза формируем разное случайное имя и выделяем 3 потока на добавление # их в список с помощью метода new_name() for _ in range(3): new_name = ''.join(random.choice(alphabets) for i in range(random.randint(2, 5))) threading.Thread(target=lock_example.new_name, args=[new_name]).start()
Класс ListOfUsers хранит список пользователей, который изначально содержит два имени, он может быть и пустым, это не определяющее проблему условие, просто так, наверное, более наглядно. У класса есть всего один метод, который добавляет в этот список новое имя. Имя я решил формировать случайной последовательностью, используя модули random и string, просто программа так выглядит повеселее. В цикле создаются три потока, каждый использует функцию .new_name() и в качестве аргумента передает в нее случайно сформированное имя. В методе .new_name() существует задержка, это самый важный момент программы. Важный потому что, если бы этой паузы не было поток мгновенно доделывал бы свои дела, а именно добавлял новое имя в список, но с задержкой успевает происходить context switch, который как раз и ломает порядок добавления пользователей в список. Как вы уже наверное поняли при запуске пользователи будут добавлять в список не в том порядке, в котором были созданы и переданы в метод .new_name(), а в случайном и каждый раз этот порядок может быть разным, ну разумеется чем больше входных данных тем более непредсказуемый результат может сформироваться.
Запустим программу
Список пользователей в начале - ['vova', 'jora'] Новый пользователь - Dflb Новый пользователь - urj Новый пользователь - wkU Пользователь urj добавлен Список пользователей после - ['vova', 'jora', 'urj'] Пользователь wkU добавлен Пользователь Dflb добавлен Список пользователей после - ['vova', 'jora', 'urj', 'wkU', 'Dflb'] Список пользователей после - ['vova', 'jora', 'urj', 'wkU', 'Dflb'] Process finished with exit code 0
Создаются пользователи в одном порядке, а добавляются в другом, таким образом если бы этот порядок был нам очень важен, то программа выполняла бы свою задачу некорректно, а если бы в качестве входных данных был не пополняемый список, а допустим изменяемое одно единственное число, то результат нас бы точно не радовал и программа также работала бы не корректно.
Думаю проблема озвучена и представлена понятно, и поскольку проблема достаточно яркая и заметная, то и решение для нее есть очень простое и лаконичное.
import random import string import threading import time class ListOfUsers: """Класс для хранения списка пользователей""" def __init__(self): # Устанавливаем список начальных пользователей self.list_of_names = ['vova', 'jora'] self.locker = threading.Lock() def new_name(self, user_name: str): """Функция для добавления нового пользователя""" # Блокируем доступ для остальных потоков пока фрагмент кода занят self.locker.acquire() print(f'Новый пользователь - {user_name}') # Имитируем задержку добавления для провоцирования context switch time.sleep(2) self.list_of_names.append(user_name) print(f'Пользователь {user_name} добавлен') print(f'Список пользователей после - {lock_example.list_of_names}') # Выпускам поток, тем самым давая возможность войти новому self.locker.release() if __name__ == '__main__': # Создаем экземпляр и показываем начальный список lock_example = ListOfUsers() print(f'Список пользователей в начале - {lock_example.list_of_names} ') # Забираем в переменную alphabets все буквы alphabets = string.ascii_letters # Три раза формируем разное случайное имя и выделяем 3 потока на добавление # их в список с помощью метода new_name() for _ in range(3): new_name = ''.join(random.choice(alphabets) for i in range(random.randint(2, 5))) threading.Thread(target=lock_example.new_name, args=[new_name]).start()
Список пользователей в начале - ['vova', 'jora'] Новый пользователь - yB Пользователь yB добавлен Список пользователей после - ['vova', 'jora', 'yB'] Новый пользователь - Fzf Пользователь Fzf добавлен Список пользователей после - ['vova', 'jora', 'yB', 'Fzf'] Новый пользователь - kVfCl Пользователь kVfCl добавлен Список пользователей после - ['vova', 'jora', 'yB', 'Fzf', 'kVfCl'] Process finished with exit code 0
Создаем блокиратор как экземпляр класса Lock(). Метод .acquire() для установки блокировки, метод .release() для снятия блокировки. Метод .acquire() поддерживает параметр timeout, в которое передается время, через которое блокировка будет снята, при этом неважно был ли применен метод .release() или нет.
Теперь как видно потоки последовательно используют метод. Первый поток взялся за метод и стал применять его к переменной, спокойно сделал в нем все свои дела и вышел, тем самым освободив место для следующего потока, не переживая, что пока он занят тут своими делами придет другой поток, отнимет у него эту переменную и применит к ней метод.
Lock() должен закрываться, метод .release() и зачастую методы, которые должны закрываться поддерживают with и Lock не исключение.
... def new_name(self, user_name: str): """Функция для добавления нового пользователя""" # Блокируем доступ для остальных потоков пока фрагмент кода занят with self.locker: print(f'Новый пользователь - {user_name}') # Имитируем задержку добавления для провоцирования context switch time.sleep(2) self.list_of_names.append(user_name) print(f'Пользователь {user_name} добавлен') print(f'Список пользователей после - {lock_example.list_of_names}') ...
Использовать with можно вот таким образом.
Помимо Lock() существует RLock(). Реализация представлена также методами .acquire() и .release() и также блокирует фрагмент кода для одного потока. Отличие заключается в том, что RLock() решает проблему повторяющейся блокировки. Что под этим имеется ввиду. Допустим есть поток блокирующий фрагмент методом .acquire() и пока фрагмент не освободится методом .release() новый поток не приступит к выполнению этого кода, и из-за неграмотно написанного кода может получиться ситуация, что на один фрагмент повесятся два .acquire() и исполнение заблокируется, до .release() мы уже никогда не дойдем. Или поток, который использует .acquire() вызывается еще раз в каком-то другом фрагменте, мы про это забыли, а он также повесит .acquire() и зависнет навсегда. Как раз RLock сам отлавливает такие ситуации и если он видит, что фрагмент сам себя заблокировал (это состояние называют Deadlock) разблокирует его и фрагмент успешно исполнится.
... def new_name(self, user_name: str): """Функция для добавления нового пользователя""" # Блокируем доступ для остальных потоков пока фрагмент кода занят self.locker.acquire() print(f'Новый пользователь - {user_name}') # Имитируем задержку добавления для провоцирования context switch time.sleep(2) # Блокируем доступ повторно self.locker.acquire() self.list_of_names.append(user_name) print(f'Пользователь {user_name} добавлен') print(f'Список пользователей после - {lock_example.list_of_names}') self.locker.release() ...
Список пользователей в начале - ['vova', 'jora'] Новый пользователь - AR
В нашем примере это можно вызвать так, пусть после sleep() стоит еще одна блокировка, не важно как она могла бы там получиться, но в реальности методы сложнее чем в этих примерах, поэтому на этом месте мог быть вызов еще какого-нибудь метода, где использовался этот поток и мы бы оказались в такой ситуации. Все, мы увидели две строчки и программа будет висеть в таком состоянии пока мы ее вручную не остановим. Если заменить Lock(), на RLock() и больше ничего не менять, то программа без каких-либо трудностей отработает. Либо мы могли добавить в .acquire() параметр timeout и через время из timeout программа бы исполнилась, но предпочтительнее все-таки использовать RLock() в подобных ситуациях.
Condition
Последний класс для работы с синхронизацией потоков - Condition(). В своей реализации напоминает Event(), мы также имеем флаг True False и метод .wait(), фрагмент кода после которого не начнет исполняться, пока флаг не примет значение True. Но особенность в том, что как только фрагмент после .wait() выполнен, то флаг опять становится в значение False и мы снова должны ждать пока что-то спровоцирует переключение состояния флага.
Пример может выглядеть так
import random import threading import string class GiftsSend: """В классе имитируем отправку подарков для победителей, но создаем условие, что имя победителя должно быть написано с заглавной буквы""" def __init__(self, count_winners): # Передаем число победителей self.count_winners = count_winners # Создаем экземпляр Condition() self.condition = threading.Condition() # Забираем в переменную все заглавные буквы self.upper_alphabets = string.ascii_uppercase def who_win(self, winner_name): print(f'Проверяем {winner_name} на соответствие') # with заменяет .acquire() и .release() with self.condition: # Проверяем, что первая буква большая if winner_name[0] in self.upper_alphabets: print(f'{winner_name} получает подарок') # Если первая буква маленькая ставим на ожидание else: # Через 5 секунд ожидание разблокируется # и мы увидим имена, которые не получили подарок self.condition.wait(timeout=5) print(f'{winner_name} Без подарка') if __name__ == '__main__': # Генерируем 10 победителей со случайными именами # И запускаем для каждого в отдельном потоке .who_win() send_example = GiftsSend(10) alphabets = string.ascii_letters for winner in range(send_example.count_winners): name = ''.join(random.choice(alphabets) for i in range(random.randint(2, 5))) send_thread = threading.Thread(target=send_example.who_win, args=[name], name=name) send_thread.start()
Проверяем uNHCA на соответствие Проверяем MxYdB на соответствие MxYdB получает подарок Проверяем sVU на соответствие Проверяем NEY на соответствие NEY получает подарок Проверяем TvC на соответствие Проверяем RhP на соответствие RhP получает подарок Проверяем wmudR на соответствие Проверяем HJK на соответствие TvC получает подарок Проверяем Kh на соответствие Kh получает подарок Проверяем icr на соответствие HJK получает подарок uNHCA Без подарка sVU Без подарка wmudR Без подарка icr Без подарка Process finished with exit code 0
В классе GiftsSend создадим экземпляр Condition() и 2 вспомогательные переменные. Создадим метод who_win(), который будет проверять переданное в него имя, должно быть соблюдено условие, что первая буква имени заглавная, иначе победитель переходит в .wait() и если бы там не был явно указан параметр timeout, то находился бы он в этом .wait() пока мы вручную не остановили программу.
Вместо with можно было использовать методы .acquire() и .release().
В результате мы видим, что сначала проверяются все имена, те что начинаются с большой буквы выдаются сразу, а те что с маленькой ждут пока будут выданы подарки всем заслуживающим и только после этого получают сообщение, что подарок они не получают.
Но если timeout не задавать, то программа бы работала пока мы ее не остановили вручную, конечно, такого поведения нам бы хотелось избежать. Для этого немного изменим программу.
import random import threading import string import time class GiftsSend: """В классе имитируем отправку подарков для победителей, но создаем условие, что имя победителя должно быть написано с заглавной буквы""" def __init__(self, count_winners): # Передаем число победителей self.count_winners = count_winners # Создаем экземпляр Condition() self.condition = threading.Condition() # Забираем в переменную все заглавные буквы self.upper_alphabets = string.ascii_uppercase def who_win(self, winner_name): print(f'Проверяем {winner_name} на соответствие') # with заменяет .acquire() и .release() with self.condition: # Проверяем, что первая буква большая if winner_name[0] in self.upper_alphabets: print(f'{winner_name} получает подарок') # Если первая буква маленькая ставим на ожидание else: # Через 5 секунд ожидание разблокируется # и мы увидим имена, которые не получили подарок self.condition.wait() print(f'{winner_name} Без подарка') if __name__ == '__main__': # Генерируем 10 победителей со случайными именами # И запускаем для каждого в отдельном потоке .who_win() send_example = GiftsSend(10) alphabets = string.ascii_letters upper_alphabets = string.ascii_uppercase for winner in range(send_example.count_winners): name = ''.join(random.choice(alphabets) for i in range(random.randint(2, 5))) send_thread = threading.Thread(target=send_example.who_win, args=[name], name=name) send_thread.start() time.sleep(1) # Проверяем каждое имя, если первая буква маленькая - снимаем .wait() if send_thread.name[0] not in upper_alphabets: with send_example.condition: send_example.condition.notify()
Проверяем iNeSY на соответствие iNeSY Без подарка Проверяем fxB на соответствие fxB Без подарка Проверяем DzHY на соответствие DzHY получает подарок Проверяем wrj на соответствие Проверяем OsDJG на соответствие wrj Без подарка OsDJG получает подарок Проверяем fN на соответствие fN Без подарка Проверяем PS на соответствие PS получает подарок Проверяем lPY на соответствие lPY Без подарка Проверяем XBpof на соответствие XBpof получает подарок Проверяем qXFzj на соответствие qXFzj Без подарка Process finished with exit code 0
Для снятия блокировки есть методы .notify() и .notify_all(). Добавим в программу проверку первой буквы каждого имени и будем разблокировать .wait() для таких имен, таким образом мы сразу сортируем все имена и сразу решаем кому отправлять подарок, а кому нет. .notify_all() разблокирует все .wait(), а .notify() можно передать число потоков, которое должно быть снято с .wait().
Таким образом, мы достаточно подробно разобрались с потоками и с возможностями для взаимодействия с ними.
Осталось последняя тема, о которой хотелось бы поговорить внутри данного материала - Многопроцессорность.
Многопроцессорность
multiprocessing
Для работы с процессами используется модуль multiprocessing стандартного набора библиотек python. Процессы, в отличие от потоков, уже помогут нам повлиять на скорость работы программы. И многие синтаксические особенности, на самом деле, очень схожи с threading.
Сразу напишем программу
import multiprocessing import random import string import os class ListOfUsers: """Класс для хранения списка пользователей""" def __init__(self): # Устанавливаем список начальных пользователей self.users = {} def new_user(self, name, password): # Добавление нового пользователя в словарь self.users[name] = password # Вывод информации о процессе print(f'\nid процесса: {os.getpid()}' f'\nРодительский процесс - {os.getppid()}' f'\nИмя процесса - {multiprocessing.current_process().name}') # Вывод пользователей после работы метода внутри процесса print(self.users) if __name__ == '__main__': # Экземпляр класса ex_list = ListOfUsers() # Переменная со всеми буквами, для генерации имени alphabets = string.ascii_letters # Переменная со всеми буквами + цифрами, для генерации пароля alphabets_and_digits = string.ascii_letters + string.digits # Информация до начала работы print(f'Словарь пользователей вначале - {ex_list.users}\n' f'Количестве ядер процессора устройства - {multiprocessing.cpu_count()}') # Генерация имен, паролей и запуск .new_user() с этими данными в разных процессах for process in range(3): new_name = ''.join(random.choice(alphabets) for i in range(random.randint(3, 6))) new_password = ''.join(random.sample(alphabets_and_digits, random.randint(6, 9))) multiprocessing.Process(target=ex_list.new_user, args=(new_name, new_password)).start()
Словарь пользователей вначале - {} Количестве ядер процессора устройства - 8 id процесса: 10170 Родительский процесс - 10169 Имя процесса - Process-1 {'aEdMI': 'QT247jg'} id процесса: 10171 Родительский процесс - 10169 Имя процесса - Process-2 {'tdSz': 'JlBmG0'} id процесса: 10172 Родительский процесс - 10169 Имя процесса - Process-3 {'mIbT': '7ol2MwF'} Process finished with exit code 0
Создается процесс, как и поток, только вместо threading.Thread используется multiprocessing.Process, атрибуты и их назначение у потоков и процессов полностью одинаковые, процесс также имеет параметр daemon, также работает, если к нему применен .join() и также бере на себя выполнение программы в него переданной.
Поскольку процессы напрямую связаны с нашим устройством часто для просмотра какой-то информации с процессами связанной мы будем использовать модуль os. Так в данном примере с помощью os мы смотрим id текущего потока, а также выводим id родительского процесса. В данном случае родительский это точка входа if __name__ == '__main__', процесс, который бы запускался и без явного указания, что этот процесс стоит запустить.
Имя процесса как видно формируется автоматически, если явно его не указывать. А через multiprocessing мы можем обратиться к системным атрибутам, например, вывести число ядер процессора.
Но что самое примечательное в этом примере это пожалуй вывод результата, из него становится ясно, что класс ListOfUsers запускается 3 раза, и каждый раз новый пользователь добавляется в пустой словарь. Тут мы явно можем увидеть разницу между процессами и потоками. Ведь, если сейчас заменить запуск в процессах на заупск в потоках, то результат будет следующий
import multiprocessing import threading import random import string import os class ListOfUsers: """Класс для хранения списка пользователей""" def __init__(self): # Устанавливаем список начальных пользователей self.users = {} def new_user(self, name, password): # Добавление нового пользователя в словарь self.users[name] = password # Вывод информации о процессе print(f'\nid процесса: {os.getpid()}' f'\nРодительский процесс - {os.getppid()}' f'\nИмя потока - {threading.current_thread().name}' f'\nИмя процесса - {multiprocessing.current_process().name}') # Вывод пользователей после работы метода внутри процесса print(self.users) if __name__ == '__main__': # Экземпляр класса ex_list = ListOfUsers() # Переменная со всеми буквами, для генерации имени alphabets = string.ascii_letters # Переменная со всеми буквами + цифрами, для генерации пароля alphabets_and_digits = string.ascii_letters + string.digits # Информация до начала работы print(f'Словарь пользователей вначале - {ex_list.users}\n' f'Количестве ядер процессора устройства - {multiprocessing.cpu_count()}') # Генерация имен, паролей и запуск .new_user() с этими данными в разных процессах for process in range(3): new_name = ''.join(random.choice(alphabets) for i in range(random.randint(3, 6))) new_password = ''.join(random.sample(alphabets_and_digits, random.randint(6, 9))) # multiprocessing.Process(target=ex_list.new_user, args=(new_name, new_password)).start() threading.Thread(target=ex_list.new_user, args=(new_name, new_password)).start()
Словарь пользователей вначале - {} Количестве ядер процессора устройства - 8 id процесса: 11061 Родительский процесс - 6775 Имя потока - Thread-1 (new_user) Имя процесса - MainProcess {'jFitS': 'dOT3fQ4'} id процесса: 11061 Родительский процесс - 6775 Имя потока - Thread-2 (new_user) Имя процесса - MainProcess {'jFitS': 'dOT3fQ4', 'OcWrXE': 'VwchbnjqQ'} id процесса: 11061 Родительский процесс - 6775 Имя потока - Thread-3 (new_user) Имя процесса - MainProcess {'jFitS': 'dOT3fQ4', 'OcWrXE': 'VwchbnjqQ', 'wcsBz': 'fyWwdj9J'} Process finished with exit code 0
Используя потоки мы запускам один процесс и внутри него 3 раза влияем на словарь пользователей, таким образом в итоге мы получаем один словарь, а не три разных.
Поэтому, используя процессы, мы не упираемся в GIL, точнее упираемся внутри каждого процесса, но этих процессов может быть несколько, таким образом каждый из процессов может захватить отдельный фрагмент кода и заняться его выполнение параллельно с тем, как другие процессы будут заниматься своими фрагментами кода.
Queue
Часть примитивов взаимодействия нам знакомы из модуля threading, например Lock() в multiprocessing ведет себя также, как и в threading, за небольших исключением синтаксиса вызова, о котором я расскажу в примере. Но помимо знакомых примитивов multiprocessing имеет уникальные примитивы, с которыми мы ранее не сталкивались. Начнем знакомство с такого класса как очередь - Queue(). Мы не станем пример, воспользуемся тем же, что писали выше.
Сформулируем проблему. Используя процессы, мы запускам три разных класса и получаем три разных словаря с одной парой ключ значение в каждом, а что если бы мы хотели запустить три этих процесса и результат каждого класть в некое 'хранилище', из которого мы могли бы после все эти результаты забрать и объединить. И реализовать это можно как раз через Queue.
import multiprocessing import random import string import os import time class ListOfUsers: """Класс для хранения списка пользователей""" def __init__(self): # Устанавливаем список начальных пользователей self.users = {} # Добавим экземпляр Lock() self.locker = multiprocessing.Lock() def new_user(self, name, password, queue): # Добавление нового пользователя в словарь. # Используем блокировку, чтобы with self.locker: self.users[name] = password # Вывод информации о процессе print(f'\nid процесса: {os.getpid()}' f'\nРодительский процесс - {os.getppid()}' f'\nИмя процесса - {multiprocessing.current_process().name}') # Вывод пользователей после работы метода внутри процесса queue.put(self.users) print(self.users) time.sleep(0.4) if __name__ == '__main__': # Экземпляр класса ex_list = ListOfUsers() # Экземпляр очереди q = multiprocessing.Queue() # Переменная со всеми буквами, для генерации имени alphabets = string.ascii_letters # Переменная со всеми буквами + цифрами, для генерации пароля alphabets_and_digits = string.ascii_letters + string.digits # Информация до начала работы print(f'Словарь пользователей вначале - {ex_list.users}\n' f'Количестве ядер процессора устройства - {multiprocessing.cpu_count()}') # Список для хранения потоков processes_list = [] # Генерация имен, паролей и запуск .new_user() с этими данными в разных процессах for process in range(3): new_name = ''.join(random.choice(alphabets) for i in range(random.randint(3, 6))) new_password = ''.join(random.sample(alphabets_and_digits, random.randint(6, 9))) new_process = multiprocessing.Process(target=ex_list.new_user, args=(new_name, new_password, q)) # Добавление каждого нового процесса в список processes_list processes_list.append(new_process) new_process.start() # Применяем .join() к каждому процессу for i in processes_list: i.join() # Метод .get() извлекает объект из очереди, если очередь закрыта - вызывается ValueError print('\nФормируем словарь пользователей из данных, которые хранятся в очереди') new_users_dict = {} for obj in iter(q.get, None): new_users_dict.update(obj) print(new_users_dict)
Словарь пользователей вначале - {} Количестве ядер процессора устройства - 8 id процесса: 22633 Родительский процесс - 22632 Имя процесса - Process-1 {'MnzZnU': 'Oi9QT1xe'} id процесса: 22635 Родительский процесс - 22632 Имя процесса - Process-2 {'oeNxaS': 'dKMBaI'} id процесса: 22636 Родительский процесс - 22632 Имя процесса - Process-3 {'zEgs': 'j3glsXEd'} Формируем словарь пользователей из данных, которые хранятся в очереди {'MnzZnU': 'Oi9QT1xe'} {'MnzZnU': 'Oi9QT1xe', 'oeNxaS': 'dKMBaI'} {'MnzZnU': 'Oi9QT1xe', 'oeNxaS': 'dKMBaI', 'zEgs': 'j3glsXEd'}
Создается объект Queue() через экземпляр. В данном примере сразу посмотрим и на Queue() и Lock(). У использования подобных примитивов в multiprocessing есть одна особенность, используемый примитив должен быть явно передан в качестве аргумента, передаваемой в процесс функции. Как вы могли заметить q явно передан в метод new_user(), в качестве аргумента queue. Lock() при этом в качестве аргумента не передан, потому что создается в инициализаторе и уже существует в списке аргументов экземпляра. Если бы мы хотели объявить Lock() на уровне с Queue(), допустим, прямо строчкой ниже, то Lock() тоже требовалось бы явно передавать в качестве аргумента.
Queue() умеет класть что-то в очередь - метод .put() и забирать что-то из очереди - метод .get(). В нашем примере экземпляр Queue() существует в MainProcess, поэтому класть в нее что-то могут все дочерние процессы, а забирать это что-то мы можем через MainProcess.
В main мы по прежнему запускаем 3 процесса и каждый, в данном примере, передаем в список, а после запускаем, это нужно нам для того, чтобы удобно применить ко всем процессам метод .join(), тем самым мы говорим, что программа будет работать пока каждый этих процессов не завершится. Это нужно для корректного забирания данных из очереди, если мы не будем использовать .join(), мы в итоге все равно увидим словарь из трех пар, но порядок будет не такой, как задумывалось.
Словарь пользователей вначале - {}
Количестве ядер процессора устройства - 8
id процесса: 23344
Родительский процесс - 23343
Имя процесса - Process-1
Формируем словарь пользователей из данных, которые хранятся в очереди
{'jPIwwn': 'cIfHW6p'}
{'jPIwwn': 'cIfHW6p'}
id процесса: 23345
Родительский процесс - 23343
Имя процесса - Process-2
{'FmCa': 'oqs1QZHxG'}
{'jPIwwn': 'cIfHW6p', 'FmCa': 'oqs1QZHxG'}
id процесса: 23346
Родительский процесс - 23343
Имя процесса - Process-3
{'ckIuG': 'NeQirtpT'}
{'jPIwwn': 'cIfHW6p', 'FmCa': 'oqs1QZHxG', 'ckIuG': 'NeQirtpT'}
Выглядеть это будет вот таким образов, без .join() мы заходим в последний цикл после первой итерации, а с .join() только после последней.
В методе new_user() мы добавляем словарь с одной парой в очередь методом .put(), а в main в цикле с использованием метода .iter() мы это значение забираем с помощью .get() и с помощью .update() добавляем в новый словарь для хранения результата работы каждого из процессов.
Таким образом мы получаем пополняемую лесенку значений словаря.
Но как можно заметить программа не завершена, мы не видим строку Process finished with exit code 0. И завершать нам ее придется вручную. Происходит это потому что main процесс все еще активен. Тяжело отследить, что именно его держит в данной ситуации, но мы можем остановить его вручную для прекращения работы программы.
... # Метод .get() извлекает объект из очереди, если очередь закрыта - вызывается ValueError print('\nФормируем словарь пользователей из данных, которые хранятся в очереди') new_users_dict = {} for obj in iter(q.get, None): new_users_dict.update(obj) print(new_users_dict) print(q.empty()) time.sleep(0.2) if q.empty(): q.close() raise ValueError(f'Принудительно остановил процесс {multiprocessing.current_process()}')
Словарь пользователей вначале - {} Количестве ядер процессора устройства - 8 id процесса: 23965 Родительский процесс - 23964 Имя процесса - Process-1 {'vCX': 'PjFyGktJR'} id процесса: 23966 Родительский процесс - 23964 Имя процесса - Process-2 {'VSQMsD': 'T046oO'} id процесса: 23968 Родительский процесс - 23964 Имя процесса - Process-3 {'jGuCt': '0pJjDg8'} Формируем словарь пользователей из данных, которые хранятся в очереди {'vCX': 'PjFyGktJR'} False {'vCX': 'PjFyGktJR', 'VSQMsD': 'T046oO'} False {'vCX': 'PjFyGktJR', 'VSQMsD': 'T046oO', 'jGuCt': '0pJjDg8'} True Traceback (most recent call last): File "/home/tsarkoilya/kavo/PycharmProjects/forsite/asyncio/multiprocessing_example_one.py", line 73, in <module> raise ValueError(f'Принудительно остановил процесс {multiprocessing.current_process()}') ValueError: Принудительно остановил процесс <_MainProcess name='MainProcess' parent=None started> Process finished with exit code 1
Для ручного прибивания можно использовать метод .empty() для проверки наличия объектов в очереди и после того как .empty() возвращает True (то есть очередь пуста) вызывать raise. Можно использовать метод .close(), который провоцирует ValueError, но когда мы используем raise, можно и не писать .close() явно. В выводе raise мы видим что остановился MainProcess как самый главный и действующий на данный момент. Три дочерних процесса, запущенных выше перестают существовать как только отдают результат в .put().
Очередь может принимать параметр maxsize, куда передается максимальное доступное элементов очереди.
У .put() и .get() есть аналоги .put_nowait() и .get_nowait(), эти методы обрабатывают еще и отсутствие передаваемого или забираемого объекта. Так, используя .get_nowait(), можно переписать наш костыль завершения программы.
... print('\nФормируем словарь пользователей из данных, которые хранятся в очереди') new_users_dict = {} try: for obj in iter(q.get_nowait, None): new_users_dict.update(obj) except Exception: print(f'Итоговый словарь - {new_users_dict}')
Словарь пользователей вначале - {} Количестве ядер процессора устройства - 8 id процесса: 26261 Родительский процесс - 26260 Имя процесса - Process-1 {'bYcFqy': 'aFkYOXi'} id процесса: 26262 Родительский процесс - 26260 Имя процесса - Process-2 {'TlXHTx': '3heWbw'} id процесса: 26264 Родительский процесс - 26260 Имя процесса - Process-3 {'IGQXY': '26Pp5NWq'} Формируем словарь пользователей из данных, которые хранятся в очереди Итоговый словарь - {'bYcFqy': 'aFkYOXi', 'TlXHTx': '3heWbw', 'IGQXY': '26Pp5NWq'} Process finished with exit code 0
При использовании .get_nowait() последняя итерация с .empty() == True будет вызывать raise Empty, мы можем ловить этот момент в try except и просто возвращать итоговый результат по завершению.
Pipe
Обмениваться данными между процессами можно не только через Queue(), существует еще один примитив для этих целей - Pipe()
Особенность Pipe() в том, что возвращается кортеж каналов, один для приема данных, второй для отправки. Таким образом мы можем положить в Pipe() какой-то результат методом .send(), а забрать методом .recv(). Метод .send() будет применен к input каналу, а метод .recv() - к output.
Концы канал равносильны по значимости, то есть мы можем использовать в качестве input канала как первое так и второе значение кортежа, с output каналом ситуация аналогичная.
import multiprocessing class Compound: def __init__(self, info): self.info = info def example_send_info(self, pipe): """Через канал для передачи данных помещаем в pipe переданную строку""" pipe.send(self.info) pipe.close() if __name__ == '__main__': ex_compound = Compound('some info') # Экземпляр Pipe() содержит канал для передачи данных и для получения o_p, i_p = multiprocessing.Pipe() # В экземпляр в качестве аргумента отдаем канала input() pr = multiprocessing.Process(target=ex_compound.example_send_info, args=(i_p,)) pr.start() # Получаем данные из Pipe() через канал output() print(f'{o_p.recv()}')
some info Process finished with exit code 0
Создаем экземпляр Pipe(), распаковываем объект в переменные и переменную, которую мы бы хотели использовать как input канал отдаем вместе с аргументами.
После отправки данных в Pipe() соединение закрывается методом .close(), этого можно в данном случе не делать, результат от этого никак не изменится, но если мы представим, что к Pipe() могут подключиться нежелательные процессы, то лучше, конечно, закрывать соединение после того как предполагается передача в Pipe() всех интересующих нас данных.
Ну а после через канал, который предполагается как output мы эти данные можем получить.
import multiprocessing class Compound: """Класс для хранения списка пользователей""" def __init__(self, info): self.info = info def example_send_info(self, pipe): """Через канал для передачи данных помещаем в pipe переданную строку""" pipe.send(self.info) pipe.close() if __name__ == '__main__': ex_compound = Compound('some info') # Экземпляр Pipe() содержит канал для передачи данных и для получения o_p, i_p = multiprocessing.Pipe() # В экземпляр в качестве аргумента отдаем канала input() for pr in range(2): multiprocessing.Process(target=ex_compound.example_send_info, args=(i_p,)).start() # Получаем данные из Pipe() через канал output() print(o_p.recv()) print(o_p.fileno()) print(i_p.fileno()) print(o_p.poll())
some info 3 4 False some info 3 4 False Process finished with exit code 0
Можно запустить также несколько процессов и для каждого можно использовать один Pipe().
Передавать данные можно в байтовых данных, для этого используются методы .send_bytes() и .recv_bytes()
.fileno() - возвращает файловый дескриптор
.poll() - проверяет есть ли в Pipe() какие-нибудь данные доступные для чтения.
Pool
Существует класс для работы с пулом процессов - Pool(). Чем-то напоминает concurrent.futures.ThreadPoolExecutor, который мы рассматривали, когда говорили о потоках, у ThreadPoolExecutor есть аналог для процессов ProcessPoolExecutor, логика работы которого примерно та же. Можно использовать и его, но у нас есть класс от multiprocessing специально для этих же задач.
Особенность Pool() в удобной работе с несколькими потоками.
import multiprocessing import random import string class Compound: """Класс для хранения списка пользователей""" def __init__(self, main_p_name): self.name = main_p_name @staticmethod def get_result(result): """Метод для демонстрации Pool().apply()""" current_p = multiprocessing.current_process().name print(f'Данные - {result}' f'\nТекущий процесс - {current_p}') @staticmethod def get__other_result(other_result): """Метод для демонстрации Pool().apply()""" current_p = multiprocessing.current_process().name print(f'Данные - {other_result}' f'\nТекущий процесс - {current_p}') if __name__ == '__main__': # Все буквы, из них сгенерируем случайный список alphabets = string.ascii_letters # Служебная информация ex_p = Compound(multiprocessing.current_process().name) print(f'Главный процесс - {ex_p.name}') # Запуск процессов через Pool() with multiprocessing.Pool(processes=2) as p: # Генерация списка букв разбитых по одной rand_info = ''.join(random.choice(alphabets) for i in range(random.randint(3, 6))) # .apply() для запуска одного процесса p.apply(ex_p.get_result, [rand_info]) p.apply(ex_p.get__other_result, [rand_info])
Главный процесс - MainProcess Данные - GTV Текущий процесс - ForkPoolWorker-1 Данные - GTV Текущий процесс - ForkPoolWorker-2 Process finished with exit code 0
Создавать Poll() будем через with, в качестве аргумента передаем количество потоков, которое мы готовы выделить на данный пул. Запускать процессы из пула мы можем разные, используется для этого метод .apply(), куда передается функция и аргументы, при необходимости. Так напишем два статических метода и запустим их в одном пуле, выделив на эту задачу как раз 2 процесса. В результате мы видим, что каждый процесс выполнил отдельную задачу. Если бы мы передали в processes единицу, то один процесс сначала выполнил бы первый метод, а потом второй.
.apply() имеет асинхронный аналог - .apply_async(), который поддерживает callback, он будет вызываться как только процесс завершится.
Если мы хотим послать пул процессов не на разные задачи, а на одну, то .apply() нам, конечно, не подойдет.
Для такой задачи есть метод - .map().
import multiprocessing import string class Compound: """Класс для хранения списка пользователей""" def __init__(self, main_p_name): self.name = main_p_name @staticmethod def get_result(result): """Метод для демонстрации Pool().map()""" current_p = multiprocessing.current_process().name print(f'Данные - {result}' f'\nТекущий процесс - {current_p}') if __name__ == '__main__': # Все буквы, из них сгенерируем случайный список alphabets = string.ascii_letters # Служебная информация ex_p = Compound(multiprocessing.current_process().name) print(f'Главный процесс - {ex_p.name}') # Запуск процессов через Pool() with multiprocessing.Pool(processes=4) as p: # Генерация списка букв разбитых по одной r_list = [rand_info for rand_info in alphabets] # .map() для объединенного запуска всех процессов p.map(ex_p.get_result, r_list[:3])
Главный процесс - MainProcess Данные - b Текущий процесс - ForkPoolWorker-2 Данные - c Текущий процесс - ForkPoolWorker-3 Данные - a Текущий процесс - ForkPoolWorker-1 Process finished with exit code 0
В .map() передаем функцию и аргумент, в качестве, которого ожидается итерируемый объект. Выделим на этот пул четыре процесса и в результате видим, что задействовано только 3.
Также как и .apply() имеет аналог .map_async() с поддержкой callback.
import multiprocessing import string class Compound: """Класс для хранения списка пользователей""" def __init__(self, main_p_name): self.name = main_p_name @staticmethod def get_result(result): """Метод для демонстрации Pool().map()""" current_p = multiprocessing.current_process().name print(f'Данные - {result}' f'\nТекущий процесс - {current_p}') return result if __name__ == '__main__': # Все буквы, из них сгенерируем случайный список alphabets = string.ascii_letters # Служебная информация ex_p = Compound(multiprocessing.current_process().name) print(f'Главный процесс - {ex_p.name}') # Запуск процессов через Pool() with multiprocessing.Pool(processes=4) as p: # Генерация списка букв разбитых по одной r_list = [rand_info for rand_info in alphabets] # .map() для объединенного запуска всех процессов p.map_async(ex_p.get_result, r_list[:3], callback=ex_p.get_result) # В случае асинхронных аналогов процессы нужно закрывать и джоинить p.close() p.join()
Главный процесс - MainProcess Данные - a Текущий процесс - ForkPoolWorker-1 Данные - b Текущий процесс - ForkPoolWorker-2 Данные - c Текущий процесс - ForkPoolWorker-3 Данные - ['a', 'b', 'c'] Текущий процесс - MainProcess Process finished with exit code 0
Для работы с .map_async() и .apply_async() нужно закрывать процесс и дожидаться его завершения, методы .close() и .join(). Если их не использовать, то вся программа завершиться с первым процессом.
callback должен принимать вызываемый объект. Соответственно этот объект должен что-то возвращать, дополнительных методов писать не станем, просто возвратим результат из уже существующего, и в итоге callback возвращает нам список результатов каждого процесса.
Pool() поддерживает также методы .imap() и .imap_unordered(). Работают они почти как .map(). Различие в том, что если мы захотим вручную выводить результат каждой итерации методов .map() и .imap() методом .next(), то в случае .imap() мы сможем передать параметр timeout в .next(), который вызовет TimeoutError, если результат не сможет быть выведен за переданное время. А .imap_unordered() отличается от .imap() тем, что будет выводить результат по мере поступления.
И еще два доступных метода - .starmap() и .starmap_async(). В отличии от .map() в качестве итератора принимают список кортежей, которые позиционно распаковываются в аргументы функции. Например, функция get_result() теперь имеет аргументы result и other_result, тогда мы можем написать
p.starmap(get_result, [(result_1, result_2), (other_result_1, other_result_2)])
Тогда при распаковке выполнятся два процесса, которые в качестве аргументов примут аргументы распакованных кортежей позиционно.
Value. Array
Еще два примитива для обработки одних данных из разны процессов - Value() и Array(). Особенность в том, что мы можем создать объекты этих примитивов, как-то повлиять на их значения из разных процессов и получить в итоге результат воздействия всех процессов.
Array() представляет собой массив данных, Value() - одно единственное значение.
Мы можем сразу в одной программе объявить эти примитивы и повзаимодействовать с ними
import multiprocessing import random class DigitWorks: def __init__(self, number): self.number = number # Lock(), поскольку участвуют несколько процессов self.locker = multiprocessing.Lock() def count_of_process(self, d_array, n): cur_pr = multiprocessing.current_process().pid # Используем Lock() with self.locker: # Добавляем id процесса в массив d_array[n] = cur_pr @staticmethod def calculations(d_value): """Статический метод для проверки Value на четность""" print(f'\nЧисло - {d_value}') if d_value % 2 == 0: print('Четное') else: print('Нечетное') if __name__ == '__main__': rand_number = random.randint(1, 5) ex = DigitWorks(rand_number) # Создаем объекты Array() и Value() array = multiprocessing.Array('i', rand_number) value = multiprocessing.Value('d', rand_number) print(f'Работаем в - {multiprocessing.current_process().name}') # Value() будем использовать один раз внутри процесса pr_for_value = multiprocessing.Process(target=ex.calculations, args=(rand_number,)) pr_for_value.start() pr_for_value.join() # Array() будем использовать несколько раз внутри разных процессов process = [] for i in range(rand_number): pr = multiprocessing.Process(target=ex.count_of_process, args=(array, i)) process.append(pr) pr.start() [proc.join() for proc in process] # Вывод итоговых результатов print(f'\nСписок id процессов - {list(array)}' f'\nКоличество процессов - {len(list(array))}')
Работаем в - MainProcess Число - 5 Нечетное Список id процессов - [56851, 56852, 56853, 56854, 56855] Количество процессов - 5 Process finished with exit code 0
Создадим объекты каждого примитива и функции внутри класса для их изменения. Первым делом посмотрим на объявление
array = multiprocessing.Array('i', rand_number), i - целочисленный тип данных.
value = multiprocessing.Value('d', rand_number), d - вещественный тип данных
Объявление типа данных, при создании этих объектов, обязательно.
Для объекта Value() будем использовать статическую функцию для проверки на четность, процесс при этом используется отдельный.
Для объекта Array() будет использовать несколько процессов и каждый из процессов будет добавлять в array свой id.
Таким образом, создаются эти объекты внутри основного процесса, а используются внутри дочерних, при этом изменения над объектами внутри дочерних классах отражается в итоговом результате.
Manager
Следующий примитив - Manager(), идейно слегка напоминает идею Value() и Array().
Через объект Manager() мы также можем создавать хранилище для какого-нибудь типа данных и изменять его через другие процессы.
import multiprocessing import random class ExManager: def __init__(self, n): self.n = n # Изменяем объект менеджера из другого процесса def list_append(self, m_list): m_list.append(self.n) if __name__ == '__main__': # Случайное число и передача его в качестве аргумента rand_number = random.randint(1, 30) ex_m = ExManager(rand_number) # Запуск процессов в качестве менеджеров processes = [] with multiprocessing.Manager() as manager: # Менеджер поддерживает разные типы данных, например воспользуемся списком manager_list = manager.list() print(f'До работы - {manager_list}') # Два процесса будут влиять на список for _ in range(2): pr = multiprocessing.Process(target=ex_m.list_append, args=(manager_list,)) processes.append(pr) pr.start() [process.join() for process in processes] # Вывод итогового списка print(f'После работы - {manager_list}')
До работы - [] После работы - [11, 11] Process finished with exit code 0
Manager() поддерживает различные типы данных, на примере создается тип список, точно также можно создавать и прочие типы. Из стандартных поддерживается список и словарь, помимо стандартных поддерживаются примитивы multiprocessing, например Lock(), Queue(), Barrier() и прочие.
Создавать из одно менеджера можно одновременно объявлять хранилища под разные данные, и задействовать их в том процессе, в котором это требуется.
На примере объявляем хранилище типа список, изначально он пустой, но после того как мы в двух разных процессах положили в него значение список изменился и стал содержать в себе два этих значения. Соответственно, если запустить больше процессов, то количество значений в списке будет равно количеству запущенных процессов.
BaseManager
Еще один тип менеджера - BaseManager(). Идея в том, что мы создаем сервер с зарегистрированными функциями и создаем клиентов, которые могу подключаться к этому серверу и использовать его функции, но при условии, что сервер в данный момент запущен и клиент знает данные для подключения.
import multiprocessing.managers import random def get_rand_number(): return random.randint(1, 100) multiprocessing.managers.BaseManager.register('get_r_n', callable=get_rand_number) manager = multiprocessing.managers.BaseManager(address=('', 8080), authkey=b'123') server = manager.get_server() print(server.address) server.serve_forever()
('0.0.0.0', 8080)
import multiprocessing.managers multiprocessing.managers.BaseManager.register('get_r_n') manager = multiprocessing.managers.BaseManager(address=('localhost', 8080), authkey=b'123') manager.connect() result = manager.get_r_n() print(result)
77 Process finished with exit code 0
Создадим два файла.
В multiprocessing_class_basemanager.py регистрируем сервер.
В multiprocessing_class_basemanager_client.py подключаемся к севреру.
Метод .register() принимает имя функции, которое мы придумываем сами, а в параметр callable передаем функцию ассоциированную с этим именем.
Регистрируется сервер с адресом, который состоит из ip и порта, и пароля в бинарном формате.
.get_server() - для получения зарегистрированного сервера
.serve_forever() - для запуска.
При запуске multiprocessing_class_basemanager.py мы видим, что сервер запущен и программа не закрывается, пока мы ее не закроем вручную. Через .address я вывел значение адреса, просто, чтобы вывод не был пустым.
Соединение открыто, можем писать клиентов и подключать к серверу.
В multiprocessing_class_basemanager_client.py нам также необходимо объявить, что за метод мы будем использовать на сервере куда хотим подключиться, далее передаем адрес клиент, ip в данном случае обязателен и пароль.
Для подключения используется метод .connect().
Если сервер запущен и пароль передан верно мы успешно к нему подключимся. Результат интересующей функции можно забрать в переменную и вывести на печать, разумеется на сервере должна быть зарегистрирована функция с таким именем.
Теперь, пока сервер включен, мы можем каждый раз запускать файл multiprocessing_class_basemanager_client.py и получать случайное число от 1 до 100. Либо мы можем создать несколько клиентов и каждый сможет обращаться к серверу и получать случайное число.
Для отправки комментария необходимо авторизоваться
Комментарии
Здесь пока ничего нет...