По сути, очереди обработки заданий нужны для того, чтобы отложить выполнение некоторых дорогих операций. К примеру, если у нас веб-сервер, то важно, чтобы при клике или отправке формы пользователь получал ответ мгновенно. И не важно, на сколько сложное действие он попросил сервер выполнить:
- отправить письмо верификации,
- сгенерировать изображения (например, для предпросмотра) по отправленному,
- собрать отчёт по продажам за указанный период и выгрузить в xls,
и многое другое... Главное - пользователь хочет выполнить долгую операцию, но не хочет ждать отклик от сервера. К слову, бывает, что сервер не успевает отдать ответ за timeout сервера — пользователь получит ошибку 504 - Gateway Timeout, что также нехорошо.
И тут нам на помощь приходят очереди обработки заданий и воркеры (worker - рабочий), которые позволяют отложить выполнение долгой операции. Посмотрим, как их можно сделать на Python.
В данном примере мы сделаем небольшой веб-сервер на Flask, который будет создавать изображение для предпросмотра (карточки заметки в блоге / новостном портале / магазине) по какому-то оригинальному изображению.
Структура проекта:
.
│ db.py
│ tasks.py
│ web.py
│ worker.py
│
├───templates
│ index.html
│ tasks.html
│
├───uploads
└───__pycache__
Для начала напишем сам веб-сайт:
# web.py
from flask import Flask, render_template, request, send_from_directory
from werkzeug.utils import secure_filename
from db import Session, PhotoModel
from tasks import init_redis, create_task_resize_photo, get_resize_photo_info
UPLOAD_FOLDER = 'uploads'
app = Flask(__name__)
app.debug = True
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
f = request.files['file']
file_path = f'{UPLOAD_FOLDER}/{secure_filename(f.filename)}'
f.save(file_path)
with Session() as session:
photo = PhotoModel(url=f'/{file_path}')
session.add(photo)
session.commit()
create_task_resize_photo(app.config['redis'], photo.id)
with Session() as session:
return render_template('index.html', photos=session.query(PhotoModel).all())
@app.route('/uploads/<filename>')
def send_uploaded_file(filename):
return send_from_directory(UPLOAD_FOLDER, filename)
@app.route('/admin/task')
def task_list():
queue_size, last_items = get_resize_photo_info(app.config['redis'])
return render_template('tasks.html', queues=[{
'name': 'Ресайзилка',
'size': queue_size,
'last_items': last_items,
}])
if __name__ == '__main__':
app.config['redis'] = init_redis()
app.run()
Функция index
будет обрабатывать GET и POST запросы на url http://localhost:5000/
(по этому адресу по умолчанию стартанёт Flask). В случае запроса получения
страницы она выведет страницу со списком уже загруженных фотографий и формой загрузки. В случае отправки на неё
файла - сохранит файл на файловой системе, добавит запись в базу данных, а также создаст задачу на ресайз данной
"фотографии".
Далее - функция send_uploaded_file
- по запросу отпрвляет на клиент запрошенный файл. То есть по запросу
http://localhost:5000/uploads/900913.png
она из директории UPLOAD_FOLDER
отправит файл 900913.png
, если
он есть.
Также добавил функцию task_list
для просмотра длины очереди обработки изображений и части текущих задач.
Сами файлы html шаблонов, что использованы в web.py
:
<!-- templates/index.html -->
<!doctype html>
<html lang="ru">
<head>
<meta charset="utf-8" />
<title>Привет, Flask / с worker-ами</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/js/bootstrap.min.js" integrity="sha384-+YQ4JLhjyBLPDQt//I+STsc9iw4uQqACwlvpslubQzn4u2UU2UFM80nGisd026JF" crossorigin="anonymous"></script>
<style>
.content { margin: 0 auto; padding: 20px; max-width: 800px; }
img { max-width: 400px; max-height: 300px; }
</style>
</head>
<body>
<div class="content">
<h1>Пример работы с воркерами</h1>
<div>
<h2>Список загруженных</h2>
{% for photo in photos %}
<ul>
<li>
<a href="{{ photo.url }}">Исходный</a>
<div>
<img src="{{ photo.url }}">
</div>
</li>
<li>
<a href="{{ photo.url_preview }}">Для предпросмотра</a>
<div>
<img src="{{ photo.url_preview }}">
</div>
</li>
</ul>
<hr>
{% endfor %}
</div>
<h2>Форма для загрузки</h2>
<form method="post" enctype="multipart/form-data">
<div class="form-group">
<input class="form-control" type="file" id="file" name="file" multiple placeholder="Выберете файл">
</div>
<div class="form-group">
<button class="btn btn-primary mb-2">Отправить</button>
</div>
<a href="/admin/task">Состояние очереди</a>
</form>
</div>
</body>
</html>
— тот самый templates/index.html
для отображения списка фотографий и формы загрузки. Для шаблонизации используется
библиотека Python — Jinja2
.
И для просмотра статистики по загруженности очереди ресайза — templates/tasks.html
:
<!-- templates/tasks.html -->
<!doctype html>
<html lang="ru">
<head>
<meta charset="utf-8" />
<title>Привет, Flask / с worker-ами</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/js/bootstrap.min.js" integrity="sha384-+YQ4JLhjyBLPDQt//I+STsc9iw4uQqACwlvpslubQzn4u2UU2UFM80nGisd026JF" crossorigin="anonymous"></script>
</head>
<body>
<div class="content" style="margin: 0 auto; padding: 20px; max-width: 800px;">
<h1>Состояние очереди</h1>
<div>
{% for queue in queues %}
<p>{{ queue.name }}</p>
<ul>
<li>Размер: {{ queue.size }}</li>
<li>Последние задания: {{ queue.last_items }}</li>
</ul>
<hr>
{% endfor %}
</div>
</div>
</body>
</html>
Теперь взглянем на нашу базу данных — файл db.py
:
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///example.db', echo=True)
Base = declarative_base()
Session = sessionmaker(bind=engine)
class PhotoModel(Base):
__tablename__ = 'photo'
id = Column(Integer, primary_key=True)
url = Column(String)
url_preview = Column(String)
def __init__(self, url):
self.url = url
def __repr__(self):
return f'<{self.__class__.__name__} #{self.id}>'
Base.metadata.create_all(engine)
— простенька модель на sqlalchemy.orm. Имеет 2 поля с полезными данными:
url
— для части url к оригинальному изображению (тому, что загрузил пользователь),url_preview
— к заресайзенному изображению для предпросмотра (тому, что будет делать воркер).
Осталось рассмотреть модуль для работы с Redis очередью перед тем, как посмотреть на воркер. Итак, tasks.py
:
import redis
RESIZE_PHOTO_QUEUE = 'resize_photo_queue'
def init_redis():
pool = redis.ConnectionPool(port=6379, db=0)
conn = redis.Redis(connection_pool=pool)
conn.ping()
return pool
def create_task_resize_photo(pool, photo_id):
conn = redis.Redis(connection_pool=pool)
conn.rpush(RESIZE_PHOTO_QUEUE, photo_id) # 4 -> b'4'
def get_task_resize_photo(pool):
conn = redis.Redis(connection_pool=pool)
return conn.lpop(RESIZE_PHOTO_QUEUE)
def get_resize_photo_info(pool):
conn = redis.Redis(connection_pool=pool)
queue_size = conn.llen(RESIZE_PHOTO_QUEUE)
last_ten = conn.lrange(RESIZE_PHOTO_QUEUE, 0, 10)
return queue_size, last_ten
В RESIZE_PHOTO_QUEUE
держим название ключа Redis, который будет отвечать за нашу очередь.
Функция init_redis
создаёт пул соединений (чтобы не по одному ходить в базу данных), проверяет (пингует)
сервер на доступность.
Создаём задачу на изменение размера изображения в create_task_resize_photo
. Просто кладём справа (rpush
) в Redis список
идентификатор "фото".
В get_task_resize_photo
берём слева (lpop
) из очереди идентификатор "фото" для обработки. Получается такая "конвеерная лента":
веб-сервер на неё кладёт идентификатор объекта, который надо изменить, а воркер берёт с "ленты" его и делает работу.
Функция же get_resize_photo_info
нам нужна для страницы, на которой мы выводим информацию о состоянии очереди.
llen
покажет размер очереди (загруженность). lrange
покажет часть элементов из очереди.
Сам же файл worker.py
будет выглядеть следующим образом:
from PIL import Image
from db import Session, PhotoModel
from tasks import init_redis, get_task_resize_photo
PREVIEW_SIZE = (100, 100)
def work(pool):
photo_id = get_task_resize_photo(pool)
if not photo_id:
return sleep(1)
photo_id = int(photo_id)
with Session() as session:
photo = session.query(PhotoModel).get(photo_id)
print(f"Resize {photo.url}...")
photo.url_preview = f'{photo.url}_preview.png'
with Image.open(f'.{photo.url}') as image:
image.thumbnail(PREVIEW_SIZE)
image.save(f'.{photo.url_preview}', "PNG")
session.add(photo)
session.commit()
def main():
pool = init_redis()
while True:
work(pool)
if __name__ == '__main__':
main()
— крутимся в вечном цикле и пытаемся получить очередной id
фото. Если "конвеер" пустой — вернётся None
—
чтобы не грузить процессор, поспим секунду. Если же что-то пришло — считаем, что это идентификатор фото,
получаем по нему фото, генерируем путь до превьюшки, ресайзим изображение с помощью PIL
(Python пакет pillow
).
Сохраняем.
Итого мы разделили выполнение задачи на 2 сервера. Потенциально, если очередь будет быстро заполняться — можно поднять ещё несколько воркеров, который также примутся за разгребание очереди.
Также стоит сказать, что лучше не открывать соединение на каждый запрос. Сама реализация очереди также наивна —
а что будет, если воркер упадёт, недоделав свою работу? Задача потеряется. Чтобы не терялась есть Redis команда
RPOPLPUSH
— взять справа у одной очереди и вставить слева в другую. Так можно создать ещё очередь "in progress",
чтобы отслеживать необработанные задачи.
В целом, лучше использовать готовые инструменты, например модуль rq. Однако, полезно иметь представление о том, как они работают. Для этого и была написана данная заметка.