Образовательный проект «SnakeProject» Михаила Козлова

Навигация

⇒ FreeBSD and Nix ⇐

CISCO

Voice(Asterisk\Cisco)

Microsoft

Powershell

Python

SQL\T-SQL

Общая

WEB Разработка

ORACLE SQL \ JAVA

Мото

Стрельба, пневматика, оружие

Саморазвитие и психология


Асинхронные задачи в Python + Celery + RabbitMQ + Redis



Асинхронные задачи в Python + Celery + RabbitMQ + Redis


По мотивам статьи:
https://levelup.gitconnected.com/asynchronous-tasks-in-python-with-celery-rabbitmq-redis-480f6e506d76

Репозитория:

https://github.com/vjanz/python-asynchronous-tasks


Немного теории

Celery - очередь задач, ориентированная на обработку в реальном времени и поддерживающая планирование задач

RabbitMQ - брокер сообщений, обрабатывает очередь "сообщений" между Django и Celery

Redis - хранилище на основе ключей и значений с быстрым доступом, используется для обработки результатов

В этой статье мы собираемся использовать Celery, RabbitMQ и Redis для создания распределенной очереди задач

Распределенная очередь задач позволяет вам переложить работу на другой процесс, чтобы она выполнялась асинхронно

Как только вы помещаете работу в очередь, то не ждете и параллельно используете другие ядра для обработки задач

Таким образом, это дает возможность выполнять задачи в фоновом режиме, когда приложение решает другие задачи


Варианты использования очередей задач

Самый простой пример - отправка электронных писем после регистрации пользователя

В этом случае вы не знаете, сколько времени потребуется, чтобы отправить электронное письмо пользователю

Это может быть 1 мс, но может быть и больше, а иногда даже не будет отправлено вообще

В этих сценариях вы не знаете, что задача будет успешно выполнена, потому что это сделает провайдер


Категории для очередей задач

Сторонние задачи

Веб-приложение должно обслуживать пользователей, не дожидаясь завершения других действий при загрузке страницы

Например отправки электронного письма или каких-то уведомлений

Длительные задания

Задания, требующие значительных ресурсов, когда пользователи ждут, пока они вычислят свои результаты

Например, выполнение сложного рабочего процесса, генерация графа, обработка видео

Периодические задачи

Задания, которые планируется запускать в определенное время или через определенный интервал

Например, создание ежемесячного отчета или веб-парсер, который запускается два раза в день


Зависимости для Celery

Celery требует транспорта для отправки и получения сообщений

Некоторые кандидаты в качестве брокера сообщений:
RabbitMQ
Redis
Amazon SQS

Мы будем использовать RabbitMQ в качестве брокера

Предпочтительно использовать RabbitMQ, т.к. Celery изначально поддерживает его и просто работает

Когда задачи отправляются брокеру, а затем выполняются celery worker, мы хотим:
сохранить состояние
посмотреть, какие задачи выполнялись ранее

Для этого вам понадобится какое-то хранилище данных, и для этого мы будем использовать Redis

Для хранилищ результата так-же существует много кандидатов:
AMQP, Redis
Memcached,
SQLAlchemy, Django ORM
Apache Cassandra, Elasticsearch, Riak, etc

Для настройки этих сервисов мы будем использовать Docker:
Его легко настроить
Это изолированная среда
Легко воспроизвести среду, когда есть конфигурация (Dockerfile или docker-compose)


Настройка проекта

Создадим новый каталог и необходимые для проекта файлы:
$ mkdir /tmp/mysite && cd /tmp/mysite
$ touch docker-compose.yml requirements.txt tasks.py

Создать и активировать виртуальную среду:
$ python -m venv env
$ source env/bin/activate

Теперь установим зависимости проекта из requirements.txt:

amqp==5.0.6
billiard==3.6.4.0
celery==5.0.5
click==7.1.2
click-didyoumean==0.0.3
click-plugins==1.1.1
click-repl==0.2.0
kombu==5.1.0
prompt-toolkit==3.0.18
pytz==2021.1
redis==3.5.3
six==1.16.0
vine==5.0.0
wcwidth==0.2.5

Для данного проекта нам понадобятся Celery и Redis:
$ pip install celery==5.0.5 redis 

Настроим docker-compose для запуска RabbitMQ и Redis

В файле docker-compose.yaml вставьте следующую конфигурацию YAML:

Здесь мы запускаем две службы:
определяем образы из dockerhub
сопоставляя порты 
добавляем переменные среды

Чтобы узнать, переменные среды для вашего образа - перейдите к документации образа в dockerhub

Инициализируем приложение celery для использования:
RabbitMQ в качестве транспортера сообщений
Redis в качестве хранилища результатов

В tasks.py вставим следующий код:

Мы определили URL-адреса для RabbitMQ и Redis

Затем инициализируем приложение celery, используя эти конфигурации

Первым параметром задачи является имя текущего модуля

Функция say_hello с атрибутом @app.task (помечена как задача, может вызваться с помощью .delay())


Создаем и запускаем сервисы с помощью Docker

Запускаем службы (RabbitMQ и Redis) с помощью докера:
$ docker-compose up -d 

Убедиться, что контейнеры запущены и работают:
$ docker ps

Должны увидеть две запущенные службы и дополнительную информацию для каждой из них

Запустим celery worker, затем попробуем запустить некоторые задачи с интерактивной оболочкой python

Запуск воркера Celery
$ celery -A Tasks Worker -l info --pool=solo

Теперь запустим задачу из интерактивной оболочки:
$ python
---------------------------------
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import say_hello
>>> say_hello.delay("Test_Arg_Name_1")
<AsyncResult: 32as92a9-e7eb-22r1-9b22-x18b20d9d4g1>

Мы вызвали функцию say_hello, используя .delay(), затем передали аргумент Test_Arg_Name_1

Затем мы получаем AsyncResult, который является задачей, которая была передана брокеру

После этого данная задача будет обработана и завершена Celery в фоновом режиме

В логах воркера получена задача

Через 5 секунд (sleep(5)) сообщит, что задача успешно завершена

Повторим запуск задачи, но теперь добавим хранилище результатов Redis

В оболочке python давайте сохраним результат в переменной, а затем запустим ее свойства

$ python
---------------------------------
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import say_hello
>>> result = say_hello.delay("Test_Arg_Name_1")
>>> result.status
'PENDING'
>>> result.status
'SUCCESS'
>>> result.get()
'Hello Test_Arg_Name_1'

PENDING и SUCCESS - состояния, которые мы успеваем увидеть т..к сделали исскуственную задержку sleep(5)

Если б не был настроен бэкэнд Redis на Celery, мы не могли получить доступ к свойствам или функциям

По умолчанию не хранятся состояния, но поскольку Redis есть, мы можем получать информацию о задачах

 


Комментарии пользователей

Эту новость ещё не комментировалиНаписать комментарий
Анонимам нельзя оставоять комментарии, зарегистрируйтесь!

Контакты Группа ВК Сборник материалов по Cisco, Asterisk, Windows Server, Python и Django, SQL и T-SQL, FreeBSD и LinuxКод обмена баннерами Видео к IT статьям на YoutubeВидео на другие темы Смотреть
Мои друзья: Советы, помощь, инструменты для сис.админа, статическая и динамическая маршрутизация, FreeBSD

© Snakeproject.ru создан в 2013 году.
При копировании материала с сайта - оставьте ссылку.

Рейтинг@Mail.ru
Рейтинг@Mail.ru Яндекс.Метрика





Поддержать автора и проект