Про асинхронность я уже писал ранее, но там было скорее концептуальное описание асинхронности как неблокируемости. Сейчас же хочется немного отойти от абстрактных рассказов, от асинхронности в языках программирования и поговорить про асинхронность в архитектурных решениях.
Асинхронное API
Рассмотрим, наверное, самое базовое решение. Мы делаем какое-то HTTP API, которым будут пользоваться клиенты из вне. Вопросы формата запроса: какая-то модификация REST, XML-RPC, JSCON-RPC, gRPC, "чистый HTTP" – не суть. Главное то, что мы не всегда знаем, за какое время будет дан ответ, не порвётся ли соединение по timeout-у (к слову, возможно поэтому популярна заметка про увеличение timeout-а).
Создание запроса
А раз мы не знаем, когда ответим – лучше отдавать не сам ответ, а обещание ответить. Уж это то мы можем! Достаточно сам запрос положить к себе в локальное хранилище, сгенерировать id запроса, которое будет как раз обещанием. Id можно как случайное – тогда нужно будет хранить ещё и отображение id на запрос, так и генерировать его из самого запроса по ключевым параметрам, которые будут уникальны для каждого потенциального запроса.
Например, приходит запрос на получение статистики посещаемости сайта. А статистика у нас сложная, всякие воронки считаются, да и разные агрегирующие функции отрабатывают не мгновенно. Сам запрос мы кладём в какую-то очередь, к примеру, Redis, AMQP, или даже в таблицу БД. Предположим, запрос сложный – генерируем случайный id, например, через UUID или GUID. Тогда при сохранении запроса в базе/очереди/key-value хранилище добавляем ключевое поле "id" с нашим идентификатором. Сам этот идентификатор отдаём клиенту.
Считаем, что сохранение в очередь происходит крайне быстро, да и генерация id быстра – делаем это синхронно, по времени ответа почти незаметно для клиента.
Статус запроса
Что же наш клиент должен делать с полученным идентификатором? Ему то нужен ответ на запрос, а не идентификатор запроса. А клиент должен с некоторой периодичностью проверять статус запроса – для этого посылать на API уже другой запрос с полученным идентификатором.
Сначала, например, статус "в обработке", потом "готов". А может быть и статус ошибки, если с запросом что-то не так, либо же наш сервер где-то сломался. Вообще, статусов может быть великое множество, но базово – "в очереди" и "готов к получению".
Сам же запрос сидит в очереди, пока до него не доберётся уже отдельный обработчик запросов (worker), который уже без ограничений HTTP timeout-а займётся им. По завершении работы, он положит результат в какое-то хранилище, а статус запроса переведёт в "готово".
Получение результата
Когда же наш обработчик подготовил ответ, клиент на очередной запрос статуса получает ответ "готово", и делает уже 3-ий тип запроса – на получение результата.
Сам же результат может быть сохранён в БД, если это какой-то JSON / XML / etc ответ. Или же, как в примере с подготовкой отчёта статистики использования сайта – на файлохранилище в виде .xls / .pdf файла. И в зависимости от этого в "результате" будут либо данные, либо ссылка, либо ещё что-то.
При этом заметьте, не каждом этапе мы вполне можем пользоваться синхронным кодом, само же API будет асинхронным.
Очереди для асинхронной обработки запросов
Помимо возможности создания таких очередей в памяти для обеспечения межпроцессного взаимодействия, определённо стоит упомянуть и специализированные сервера, которые обеспечивают связь нескольких сетевых узлов.
Очереди на базах данных
Если у Вас уже есть приложение, вполне возможно, вы уже используете какую-то базу данных. Пока нагрузка не велика, для организации очереди можно использовать всю ту же базу данных. Чтобы посчитать примерную нагрузку на БД, берём примерное количество сообщений. На создание и забор результата придётся минимум 4 записи:
- запись запроса шлюзом API,
- запись "забрано в работу таким-то воркером",
- запись результата,
- удаление после отдачи.
Здесь мы не рассматриваем вопрос кеширования результатов – это отдельная большая и больная тема.
По чтению же сложно что-то обещать в целом, ибо есть ещё запросы статуса, которым может быть заспамлен шлюз. С этим можно бороться лимитами и, например, троттлингом, но опять же – об этом не сегодня.
Итак, не считая запросов статуса:
- чтение воркером запроса,
- чтение шлюзом результата.
Накинем ещё 1 чтение соседним освободившимся воркером, хотя под нагрузкой такое будет не часто. Ну и плюс 1, если БД не позволяет подписаться на создание новых записей, и вы проверяете наличие новых заданий поллингом. Итого – от 4-х.
Но больнее всего будут всё же записи, ибо нам понадобится создать и поддерживать 1-2 индекса на поля идентификатора запроса и последовательности вставки. Последний можно опустить, если вам порядок забора из "очереди не важен", либо вы сделали хитрый идентификатор.
В общем, это довольно дорогое удовольствие. Можете в интернетах найти benchmark
на Вашу БД, либо протестировать самому, например, с помощью pgbench
.
На вскидку – сотни/тысячи обработанных сообщений в секунду.
Очереди на Redis
Следующая технология, на которую стоит взглянуть – key-value хранилище Redis. И тут нас больше интересует не то, что это бытрое key-value, а 2 фичи, которые есть у Redis.
Первый вариант передачи сообщений – pub-sub (да, реализация шаблона проектирования Publisher-Subsciber). Сообщения будут передаваться на весь кластер, на все подписанные клиенты (worker-ы). Но этот вариант лично мне не очень нравится, ведь сообщения:
- летят на всех подписанных, а значит сразу несколько могут его взять в работу,
- если все воркеры отвалились, сообщения пролетят мимо, и никто их не обработает.
А вот второй вариант выглядит вполне красиво: в Redis можно в качестве значения использовать список (linked list), который идеально подходит для очередей.
Если примитивно и на пальцах, то создаём:
- очередь для входящих сообщений,
- очередь обрабатываемых,
- какое-нибудь хранилище для состояния / результатов обработки, например, hash, или же обычный скаляр на каждый запрос.
Порядок обработки будет примерно следующим:
- В хранилище шлюз создаёт запись о запросе с ключом – идентификатором.
- Запрос помещается в очередь входящих.
- Воркер атомарно (командой RPOPLPUSH) переносит запрос из входящих в обрабатываемые.
- Когда воркер обработал сообщение – результат в хранилище. Можно прямо в запись с ключом – идентификатором.
- Воркер удаляет (помним, что
LREM
работает за линейное время) запрос из очереди обрабатываемых.
К этому можно ещё прикрутить очередь ошибок, чтобы отдельный сервис следил за воркерами или же очередью обрабатываемых. Ну и переносил сообщения в очередь ошибок, если долго висит / воркер прислал сообщение о перезапуске (упал в процессе обработки).
Каждый пункт – чрезвычайно быстрый запрос к серверу Redis. Более того, часть пунктов можно исключить, обложив Redis скриптами на Lua (поддерживается "из коробки"). Скорее всего, Вы без проблем найдёте готовую библиотеку под Ваш стек технологий.
Из минусов – высокую скорость Redis имеет не "бесплатно". По умолчанию, Redis хранит данные только в памяти. А значит падение / лимит памяти может испортить ситуацию. Возможные варианты решения этой проблемы:
- Настроить таки сохранения состояния на диск в .rdb файл.
- Настроить реплицирование в соседний Redis-сервер, да и вообще кластер.
Оба варианта замедлят работу. Не катастрофически, но надо иметь в виду.
Benchmark-и по работе Redis можно легко найти в тех же интернетах. По сравнению с БД результат отличается в лучшую сторону в десятки-сотни раз.
Очереди сообщений на AMQP
Здесь реализаций немало: ActiveMQ, RabbitMQ (субъективно, самая удобная), ZeroMQ... "MQ" – это как раз "message queue" – то есть те самые, нужные нам "очереди сообщений". И это не просто одна из фич, как в Redis, а целый протокол и набор решений именно для организации очередей сообщений. Так что развернуться здесь можно куда шире, сами же решения проще. Далее – частичный пересказ заметки про AMQP на примере.
Итак, нам понадобится:
- Создать exchange (точка обмена сообщениями / маршрутизатор в логике AMQP).
- Создать queue (сервер очереди) входящих сообщений под нашу задачу.
- На exchange связать очередь с нужным routing-key, чтобы exchange знал, куда слать какие сообщения (по сути – адрес назначения).
- Какое-либо хранилище для результатов.
А порядок обработки будет следующим:
- Шлюз создаёт сообщение с нужным routing-key, шлём в exchange.
- Воркер из очереди (queue) получает сообщение.
- Воркер обрабатывает сообщение, складывает результат в хранилище.
- Воркер шлёт ack-ответ в MQ, чтобы та поняла, что всё хорошо и удалила у себя сообщение.
Сервер queue отправит сообщение только одному воркеру, так что проблем с "помечанием" сообщения нет. Если все воркеры отвалились, сервер сообщений (queue) будет хранить сообщения, пока воркеры не оживут. Воркерам не надо опрашивать очередь – устанавливается постоянное TCP соединение, и сама очередь будет слать сообщения по готовности.
При этом exchange и очередь можно довольно гибко настроить. Например:
- задать как раз ожидание ack,
- задать durable – хранение сообщений в постоянной памяти (для защиты от потери данных при падении сервера),
- задать DMQ (dead message queue) для сообщений, которые не получили ACK в нужное время, либо количество попыток отправить превышено.
В общем, только прочитай мануал – всевозможные настройки.
По скорости вполне сравним с Redis. Нормально (не просто) кластеризуется, строятся целые иерархии MQ-серверов. Для системы, где ходят сотни миллионов сообщений в день – сойдёт.
Очередь или всё же журнал сообщений на Kafka
Ещё один вариант доставки сообщения до воркера – Apache Kafka. Но это, опять же, не очередь, а журнал сообщений (commit log). Иными словами, Kafka вообще никак не помогает в поддержании статуса сообщения (доставлено/не доставлено).
Представьте кучу подряд идущих сообщений. В конец сообщения дописываются, из начала удаляются (по старости). Клиент же хранит "указатель" на какое-то место в этой последовательности, по которому читает какое-то количество сообщений, передвигает дальше, снова читает.
Это краткое описание работы с одной партицией – с одним конкурирующим клиентом. Если хотите конкуренцию нескольких клиентов (воркеров) – сделайте несколько этих самых партиций, на каждую посадите по клиенту, а продюсер пусть раскидывает сообщения, например, поочерёдно по партициям.
Отличий от работы с очередями при использовании готовых библиотек особо не будет, кроме того, что можно "заглянуть" в прошлое. Ну и обработка ошибок осуществляется приложением, а не Kafka.
Скорость высокая, масштабируемость отличная.
Проблемы обработки задач с помощью очередей
Всё это, конечно, хорошо, но, на самом деле, нет. Есть множество ограничений у каждого решения. И это даже не вопросы производительности, а вопрос соблюдения определённых гарантий.
Гарантия доставки сообщений
Представьте, что вы отправили сообщение. Сколько раз оно будет принято? Казалось бы, в сферическом вакууме количество отправок должно совпасть с количеством получений... Но вспомните хотя бы ту же "Почту" – вечно что-то теряется. Так и с распределёнными системами – где-то сеть моргнула, где-то сервис упал...
Так что некоторые сервера очередей, протоколы передачи данных реализуют подходы "at-least-once" (доставлено 1 или более раз), "at-most-once" (1 или менее раз). Но, к примеру, при передаче сообщения о списании средств с банковского счёта, очень не хотелось бы попадать на случаи, когда не ровно 1 раз ("exactly-once") было доставлено сообщение.
Сами же подходы at-least-once, at-most-once довольно просты: шлём сообщение, ждём подтверждения доставки / не ждём подтверждения.
Если ждём – такая ситуация всё ломает: сообщение было принято, обработано, но отправить "ack" не смогло... Значит надо перепослать – и уже 2 доставки вместо одной.
Если не ждём – ещё очевидней: все получатели лежат, никто не получил...
Что же нужно делать для "exactly-once"? Добавить адрес отправителя и порядковый номер сообщения в заголовок. После чего работать равно как и при "at-least-once".
Единственное отличие – на клиенте проверять, не было ли ещё от этого отправителя сообщения с таким порядковым номером. Так с давних времён поступает TCP для транспорта потоков данных по сети. Так поступили и в той же Kafka.
Но тут есть опять же проблема – маршрут сообщений (отправитель - получатель) не должен меняться во время потока передачи данных.
Обработка ошибок доставки и клиента
Как мы уже договорились ранее, сеть моргает, сервисы падают. Как нам жить в такой неспокойной среде? Даже не так. Как сервер очередей может понять, что что-то пошло не так?
Можно, конечно, прикрутить к очередям ещё и возможность лазить по воркерам, смотреть dmesg, в целом интегрировать системы мониторинга. Но делов...
Пожалуй, самые распространённые подходы к определению, что "что-то пошло не так" – это TTL и MAX_RETRIES. То есть, если за определённое время так и не пришёл ответ об успешной обработке сообщения – пытаемся какое-то количество раз перепослать это сообщение. Если превышено максимальное количество повторных отправок – всё плохо.
Ну а что делать, если мы определили, что "плохо" – я выше описывал в списке реализаций очередей. То есть, помещаем в отдельную очередь ошибок, чтобы разобраться в причинах и перепослать всё по-новой.
Важно не забыть проверить, что система настроена так, чтобы "плохие" сообщение не просто выкидывались, а попадали в какой-то "dead message queue".
Порядок доставки сообщений, порядок доставки результата
Ещё одна проблема, которая встаёт при организации систем доставки сообщений – их порядок. К примеру, если вы пишете чат-комнату, то было бы неплохо, чтобы клиенты получили сообщения в порядке их написания.
Как раз одна из причин, по которой для доставки сообщений используются очереди – это их принцип "FIFO" (первый вошедший выйдет первым), что соответствует житейскому пониманию справедливой очереди.
В простой модели, где очередь одна и получатель один (воркер) – всё просто.
Теперь представим, что сообщения мы получаем почему-то из нескольких очередей. Будет ли доставка последовательна? Кто позаботился о том, чтобы сообщения были пронумерованы в порядке появления их в этом мире?
Теперь про доставку результата. Есть одна очередь, её конкурентно обрабатывают несколько воркеров. С одной ли скоростью они обрабатывают каждое сообщение? Когда они отдадут результат куда-то – будет ли порядок ответов соответствовать порядку запросов?
И тут надо понять – важно ли это. И если да – то придётся ставить ещё несколько ограничений и звено в цепи обработки, которое будет буферизировать и перетасовывать при необходимости. Либо же это как-то заложить в протокол обмена сообщениями на уровне логики приложения.
На самом деле, существует куда больше проблем, да и технических решений. Сюда и ACID транзакций, и CAP в целом для распределённых систем, и прочие занимательные проблемы. Хотелось базово описать, как живёт асинхронность на уровне межсистемного взаимодействия, за счёт чего, и какие неочевидные проблемы там можно найти. Надеюсь, это кому-то поможет.