Программы
Очередь обработки Redis на Python

Очередь обработки Redis на Python

В базе данных Redis есть занимательная структура данных — список. Он подходит для разных задач, но в этой заметке речь пойдёт только об очереди обработки заданий.

По сути, очереди обработки заданий нужны для того, чтобы отложить выполнение некоторых дорогих операций. К примеру, если у нас веб-сервер, то важно, чтобы при клике или отправке формы пользователь получал ответ мгновенно. И не важно, на сколько сложное действие он попросил сервер выполнить:

  • отправить письмо верификации,
  • сгенерировать изображения (например, для предпросмотра) по отправленному,
  • собрать отчёт по продажам за указанный период и выгрузить в xls,

и многое другое... Главное - пользователь хочет выполнить долгую операцию, но не хочет ждать отклик от сервера. К слову, бывает, что сервер не успевает отдать ответ за timeout сервера — пользователь получит ошибку 504 - Gateway Timeout, что также нехорошо.

И тут нам на помощь приходят очереди обработки заданий и воркеры (worker - рабочий), которые позволяют отложить выполнение долгой операции. Посмотрим, как их можно сделать на Python.

В данном примере мы сделаем небольшой веб-сервер на Flask, который будет создавать изображение для предпросмотра (карточки заметки в блоге / новостном портале / магазине) по какому-то оригинальному изображению.

Структура проекта:

.
│   db.py
│   tasks.py
│   web.py
│   worker.py
│
├───templates
│       index.html
│       tasks.html
│
├───uploads
└───__pycache__

Для начала напишем сам веб-сайт:

# web.py
from flask import Flask, render_template, request, send_from_directory
from werkzeug.utils import secure_filename

from db import Session, PhotoModel
from tasks import init_redis, create_task_resize_photo, get_resize_photo_info

UPLOAD_FOLDER = 'uploads'

app = Flask(__name__)
app.debug = True


@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        f = request.files['file']
        file_path = f'{UPLOAD_FOLDER}/{secure_filename(f.filename)}'
        f.save(file_path)

        with Session() as session:
            photo = PhotoModel(url=f'/{file_path}')
            session.add(photo)
            session.commit()
            create_task_resize_photo(app.config['redis'], photo.id)

    with Session() as session:
        return render_template('index.html', photos=session.query(PhotoModel).all())


@app.route('/uploads/<filename>')
def send_uploaded_file(filename):
    return send_from_directory(UPLOAD_FOLDER, filename)


@app.route('/admin/task')
def task_list():
    queue_size, last_items = get_resize_photo_info(app.config['redis'])

    return render_template('tasks.html', queues=[{
        'name': 'Ресайзилка',
        'size': queue_size,
        'last_items': last_items,
    }])


if __name__ == '__main__':
    app.config['redis'] = init_redis()
    app.run()

Функция index будет обрабатывать GET и POST запросы на url http://localhost:5000/ (по этому адресу по умолчанию стартанёт Flask). В случае запроса получения страницы она выведет страницу со списком уже загруженных фотографий и формой загрузки. В случае отправки на неё файла - сохранит файл на файловой системе, добавит запись в базу данных, а также создаст задачу на ресайз данной "фотографии".

Далее - функция send_uploaded_file - по запросу отпрвляет на клиент запрошенный файл. То есть по запросу http://localhost:5000/uploads/900913.png она из директории UPLOAD_FOLDER отправит файл 900913.png, если он есть.

Также добавил функцию task_list для просмотра длины очереди обработки изображений и части текущих задач.

Сами файлы html шаблонов, что использованы в web.py:

<!-- templates/index.html -->
<!doctype html>
<html lang="ru">
<head>
    <meta charset="utf-8" />
    <title>Привет, Flask / с worker-ами</title>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/js/bootstrap.min.js" integrity="sha384-+YQ4JLhjyBLPDQt//I+STsc9iw4uQqACwlvpslubQzn4u2UU2UFM80nGisd026JF" crossorigin="anonymous"></script>
    <style>
        .content { margin: 0 auto; padding: 20px; max-width: 800px; }
        img { max-width: 400px; max-height: 300px; }
    </style>
</head>
<body>
<div class="content">
    <h1>Пример работы с воркерами</h1>
    <div>
        <h2>Список загруженных</h2>
        {% for photo in photos %}
            <ul>
                <li>
                    <a href="{{ photo.url }}">Исходный</a>
                    <div>
                        <img src="{{ photo.url }}">
                    </div>
                </li>
                <li>
                    <a href="{{ photo.url_preview }}">Для предпросмотра</a>
                    <div>
                        <img src="{{ photo.url_preview }}">
                    </div>
                </li>
            </ul>
            <hr>
        {% endfor %}
    </div>

    <h2>Форма для загрузки</h2>
    <form method="post" enctype="multipart/form-data">
        <div class="form-group">
            <input class="form-control" type="file" id="file" name="file" multiple placeholder="Выберете файл">
        </div>
        <div class="form-group">
            <button class="btn btn-primary mb-2">Отправить</button>
        </div>
        <a href="/admin/task">Состояние очереди</a>
    </form>
</div>
</body>
</html>

— тот самый templates/index.html для отображения списка фотографий и формы загрузки. Для шаблонизации используется библиотека Python — Jinja2.

И для просмотра статистики по загруженности очереди ресайза — templates/tasks.html:

<!-- templates/tasks.html -->
<!doctype html>
<html lang="ru">
<head>
    <meta charset="utf-8" />
    <title>Привет, Flask / с worker-ами</title>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/js/bootstrap.min.js" integrity="sha384-+YQ4JLhjyBLPDQt//I+STsc9iw4uQqACwlvpslubQzn4u2UU2UFM80nGisd026JF" crossorigin="anonymous"></script>
</head>
<body>
<div class="content" style="margin: 0 auto; padding: 20px; max-width: 800px;">
    <h1>Состояние очереди</h1>
    <div>
        {% for queue in queues %}
            <p>{{ queue.name }}</p>
            <ul>
                <li>Размер: {{ queue.size }}</li>
                <li>Последние задания: {{ queue.last_items }}</li>
            </ul>
            <hr>
        {% endfor %}
    </div>
</div>
</body>
</html>

Теперь взглянем на нашу базу данных — файл db.py:

from sqlalchemy import Column, Integer, String, create_engine, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

engine = create_engine('sqlite:///example.db', echo=True)
Base = declarative_base()
Session = sessionmaker(bind=engine)


class PhotoModel(Base):
    __tablename__ = 'photo'
    id = Column(Integer, primary_key=True)
    url = Column(String)
    url_preview = Column(String)

    def __init__(self, url):
        self.url = url

    def __repr__(self):
        return f'<{self.__class__.__name__} #{self.id}>'


Base.metadata.create_all(engine)

— простенька модель на sqlalchemy.orm. Имеет 2 поля с полезными данными:

  1. url — для части url к оригинальному изображению (тому, что загрузил пользователь),
  2. url_preview — к заресайзенному изображению для предпросмотра (тому, что будет делать воркер).

Осталось рассмотреть модуль для работы с Redis очередью перед тем, как посмотреть на воркер. Итак, tasks.py:

import redis

RESIZE_PHOTO_QUEUE = 'resize_photo_queue'


def init_redis():
    pool = redis.ConnectionPool(port=6379, db=0)
    conn = redis.Redis(connection_pool=pool)
    conn.ping()
    return pool


def create_task_resize_photo(pool, photo_id):
    conn = redis.Redis(connection_pool=pool)
    conn.rpush(RESIZE_PHOTO_QUEUE, photo_id)  # 4 -> b'4'


def get_task_resize_photo(pool):
    conn = redis.Redis(connection_pool=pool)
    return conn.lpop(RESIZE_PHOTO_QUEUE)


def get_resize_photo_info(pool):
    conn = redis.Redis(connection_pool=pool)
    queue_size = conn.llen(RESIZE_PHOTO_QUEUE)
    last_ten = conn.lrange(RESIZE_PHOTO_QUEUE, 0, 10)
    return queue_size, last_ten

В RESIZE_PHOTO_QUEUE держим название ключа Redis, который будет отвечать за нашу очередь.

Функция init_redis создаёт пул соединений (чтобы не по одному ходить в базу данных), проверяет (пингует) сервер на доступность.

Создаём задачу на изменение размера изображения в create_task_resize_photo. Просто кладём справа (rpush) в Redis список идентификатор "фото".

В get_task_resize_photo берём слева (lpop) из очереди идентификатор "фото" для обработки. Получается такая "конвеерная лента": веб-сервер на неё кладёт идентификатор объекта, который надо изменить, а воркер берёт с "ленты" его и делает работу.

Функция же get_resize_photo_info нам нужна для страницы, на которой мы выводим информацию о состоянии очереди. llen покажет размер очереди (загруженность). lrange покажет часть элементов из очереди.

Сам же файл worker.py будет выглядеть следующим образом:

from PIL import Image

from db import Session, PhotoModel
from tasks import init_redis, get_task_resize_photo

PREVIEW_SIZE = (100, 100)


def work(pool):
    photo_id = get_task_resize_photo(pool)

    if not photo_id:
        return sleep(1)

    photo_id = int(photo_id)

    with Session() as session:
        photo = session.query(PhotoModel).get(photo_id)
        print(f"Resize {photo.url}...")
        photo.url_preview = f'{photo.url}_preview.png'

        with Image.open(f'.{photo.url}') as image:
            image.thumbnail(PREVIEW_SIZE)
            image.save(f'.{photo.url_preview}', "PNG")

        session.add(photo)
        session.commit()


def main():
    pool = init_redis()

    while True:
        work(pool)


if __name__ == '__main__':
    main()

— крутимся в вечном цикле и пытаемся получить очередной id фото. Если "конвеер" пустой — вернётся None — чтобы не грузить процессор, поспим секунду. Если же что-то пришло — считаем, что это идентификатор фото, получаем по нему фото, генерируем путь до превьюшки, ресайзим изображение с помощью PIL (Python пакет pillow). Сохраняем.

Итого мы разделили выполнение задачи на 2 сервера. Потенциально, если очередь будет быстро заполняться — можно поднять ещё несколько воркеров, который также примутся за разгребание очереди.

Также стоит сказать, что лучше не открывать соединение на каждый запрос. Сама реализация очереди также наивна — а что будет, если воркер упадёт, недоделав свою работу? Задача потеряется. Чтобы не терялась есть Redis команда RPOPLPUSH — взять справа у одной очереди и вставить слева в другую. Так можно создать ещё очередь "in progress", чтобы отслеживать необработанные задачи.

В целом, лучше использовать готовые инструменты, например модуль rq. Однако, полезно иметь представление о том, как они работают. Для этого и была написана данная заметка.

Изображение Python 3.11. Что нового?