Программы
Асинхронность и очереди в распределённых системах

Асинхронность и очереди в распределённых системах

Немного примеров работы с асинхронностью в распределённых системах

Про асинхронность я уже писал ранее, но там было скорее концептуальное описание асинхронности как неблокируемости. Сейчас же хочется немного отойти от абстрактных рассказов, от асинхронности в языках программирования и поговорить про асинхронность в архитектурных решениях.

Асинхронное API

Рассмотрим, наверное, самое базовое решение. Мы делаем какое-то HTTP API, которым будут пользоваться клиенты из вне. Вопросы формата запроса: какая-то модификация REST, XML-RPC, JSCON-RPC, gRPC, "чистый HTTP" – не суть. Главное то, что мы не всегда знаем, за какое время будет дан ответ, не порвётся ли соединение по timeout-у (к слову, возможно поэтому популярна заметка про увеличение timeout-а).

Создание запроса

А раз мы не знаем, когда ответим – лучше отдавать не сам ответ, а обещание ответить. Уж это то мы можем! Достаточно сам запрос положить к себе в локальное хранилище, сгенерировать id запроса, которое будет как раз обещанием. Id можно как случайное – тогда нужно будет хранить ещё и отображение id на запрос, так и генерировать его из самого запроса по ключевым параметрам, которые будут уникальны для каждого потенциального запроса.

Например, приходит запрос на получение статистики посещаемости сайта. А статистика у нас сложная, всякие воронки считаются, да и разные агрегирующие функции отрабатывают не мгновенно. Сам запрос мы кладём в какую-то очередь, к примеру, Redis, AMQP, или даже в таблицу БД. Предположим, запрос сложный – генерируем случайный id, например, через UUID или GUID. Тогда при сохранении запроса в базе/очереди/key-value хранилище добавляем ключевое поле "id" с нашим идентификатором. Сам этот идентификатор отдаём клиенту.

Считаем, что сохранение в очередь происходит крайне быстро, да и генерация id быстра – делаем это синхронно, по времени ответа почти незаметно для клиента.

Статус запроса

Что же наш клиент должен делать с полученным идентификатором? Ему то нужен ответ на запрос, а не идентификатор запроса. А клиент должен с некоторой периодичностью проверять статус запроса – для этого посылать на API уже другой запрос с полученным идентификатором.

Сначала, например, статус "в обработке", потом "готов". А может быть и статус ошибки, если с запросом что-то не так, либо же наш сервер где-то сломался. Вообще, статусов может быть великое множество, но базово – "в очереди" и "готов к получению".

Сам же запрос сидит в очереди, пока до него не доберётся уже отдельный обработчик запросов (worker), который уже без ограничений HTTP timeout-а займётся им. По завершении работы, он положит результат в какое-то хранилище, а статус запроса переведёт в "готово".

Получение результата

Когда же наш обработчик подготовил ответ, клиент на очередной запрос статуса получает ответ "готово", и делает уже 3-ий тип запроса – на получение результата.

Сам же результат может быть сохранён в БД, если это какой-то JSON / XML / etc ответ. Или же, как в примере с подготовкой отчёта статистики использования сайта – на файлохранилище в виде .xls / .pdf файла. И в зависимости от этого в "результате" будут либо данные, либо ссылка, либо ещё что-то.

При этом заметьте, не каждом этапе мы вполне можем пользоваться синхронным кодом, само же API будет асинхронным.

Очереди для асинхронной обработки запросов

Помимо возможности создания таких очередей в памяти для обеспечения межпроцессного взаимодействия, определённо стоит упомянуть и специализированные сервера, которые обеспечивают связь нескольких сетевых узлов.

Очереди на базах данных

Если у Вас уже есть приложение, вполне возможно, вы уже используете какую-то базу данных. Пока нагрузка не велика, для организации очереди можно использовать всю ту же базу данных. Чтобы посчитать примерную нагрузку на БД, берём примерное количество сообщений. На создание и забор результата придётся минимум 4 записи:

  1. запись запроса шлюзом API,
  2. запись "забрано в работу таким-то воркером",
  3. запись результата,
  4. удаление после отдачи.

Здесь мы не рассматриваем вопрос кеширования результатов – это отдельная большая и больная тема.

По чтению же сложно что-то обещать в целом, ибо есть ещё запросы статуса, которым может быть заспамлен шлюз. С этим можно бороться лимитами и, например, троттлингом, но опять же – об этом не сегодня.

Итак, не считая запросов статуса:

  1. чтение воркером запроса,
  2. чтение шлюзом результата.

Накинем ещё 1 чтение соседним освободившимся воркером, хотя под нагрузкой такое будет не часто. Ну и плюс 1, если БД не позволяет подписаться на создание новых записей, и вы проверяете наличие новых заданий поллингом. Итого – от 4-х.

Но больнее всего будут всё же записи, ибо нам понадобится создать и поддерживать 1-2 индекса на поля идентификатора запроса и последовательности вставки. Последний можно опустить, если вам порядок забора из "очереди не важен", либо вы сделали хитрый идентификатор.

В общем, это довольно дорогое удовольствие. Можете в интернетах найти benchmark на Вашу БД, либо протестировать самому, например, с помощью pgbench. На вскидку – сотни/тысячи обработанных сообщений в секунду.

Очереди на Redis

Следующая технология, на которую стоит взглянуть – key-value хранилище Redis. И тут нас больше интересует не то, что это бытрое key-value, а 2 фичи, которые есть у Redis.

Первый вариант передачи сообщений – pub-sub (да, реализация шаблона проектирования Publisher-Subsciber). Сообщения будут передаваться на весь кластер, на все подписанные клиенты (worker-ы). Но этот вариант лично мне не очень нравится, ведь сообщения:

  • летят на всех подписанных, а значит сразу несколько могут его взять в работу,
  • если все воркеры отвалились, сообщения пролетят мимо, и никто их не обработает.

А вот второй вариант выглядит вполне красиво: в Redis можно в качестве значения использовать список (linked list), который идеально подходит для очередей.

Если примитивно и на пальцах, то создаём:

  • очередь для входящих сообщений,
  • очередь обрабатываемых,
  • какое-нибудь хранилище для состояния / результатов обработки, например, hash, или же обычный скаляр на каждый запрос.

Порядок обработки будет примерно следующим:

  1. В хранилище шлюз создаёт запись о запросе с ключом – идентификатором.
  2. Запрос помещается в очередь входящих.
  3. Воркер атомарно (командой RPOPLPUSH) переносит запрос из входящих в обрабатываемые.
  4. Когда воркер обработал сообщение – результат в хранилище. Можно прямо в запись с ключом – идентификатором.
  5. Воркер удаляет (помним, что LREM работает за линейное время) запрос из очереди обрабатываемых.

К этому можно ещё прикрутить очередь ошибок, чтобы отдельный сервис следил за воркерами или же очередью обрабатываемых. Ну и переносил сообщения в очередь ошибок, если долго висит / воркер прислал сообщение о перезапуске (упал в процессе обработки).

Каждый пункт – чрезвычайно быстрый запрос к серверу Redis. Более того, часть пунктов можно исключить, обложив Redis скриптами на Lua (поддерживается "из коробки"). Скорее всего, Вы без проблем найдёте готовую библиотеку под Ваш стек технологий.

Из минусов – высокую скорость Redis имеет не "бесплатно". По умолчанию, Redis хранит данные только в памяти. А значит падение / лимит памяти может испортить ситуацию. Возможные варианты решения этой проблемы:

  • Настроить таки сохранения состояния на диск в .rdb файл.
  • Настроить реплицирование в соседний Redis-сервер, да и вообще кластер.

Оба варианта замедлят работу. Не катастрофически, но надо иметь в виду.

Benchmark-и по работе Redis можно легко найти в тех же интернетах. По сравнению с БД результат отличается в лучшую сторону в десятки-сотни раз.

Очереди сообщений на AMQP

Здесь реализаций немало: ActiveMQ, RabbitMQ (субъективно, самая удобная), ZeroMQ... "MQ" – это как раз "message queue" – то есть те самые, нужные нам "очереди сообщений". И это не просто одна из фич, как в Redis, а целый протокол и набор решений именно для организации очередей сообщений. Так что развернуться здесь можно куда шире, сами же решения проще. Далее – частичный пересказ заметки про AMQP на примере.

Итак, нам понадобится:

  • Создать exchange (точка обмена сообщениями / маршрутизатор в логике AMQP).
  • Создать queue (сервер очереди) входящих сообщений под нашу задачу.
  • На exchange связать очередь с нужным routing-key, чтобы exchange знал, куда слать какие сообщения (по сути – адрес назначения).
  • Какое-либо хранилище для результатов.

А порядок обработки будет следующим:

  1. Шлюз создаёт сообщение с нужным routing-key, шлём в exchange.
  2. Воркер из очереди (queue) получает сообщение.
  3. Воркер обрабатывает сообщение, складывает результат в хранилище.
  4. Воркер шлёт ack-ответ в MQ, чтобы та поняла, что всё хорошо и удалила у себя сообщение.

Сервер queue отправит сообщение только одному воркеру, так что проблем с "помечанием" сообщения нет. Если все воркеры отвалились, сервер сообщений (queue) будет хранить сообщения, пока воркеры не оживут. Воркерам не надо опрашивать очередь – устанавливается постоянное TCP соединение, и сама очередь будет слать сообщения по готовности.

При этом exchange и очередь можно довольно гибко настроить. Например:

  • задать как раз ожидание ack,
  • задать durable – хранение сообщений в постоянной памяти (для защиты от потери данных при падении сервера),
  • задать DMQ (dead message queue) для сообщений, которые не получили ACK в нужное время, либо количество попыток отправить превышено.

В общем, только прочитай мануал – всевозможные настройки.

По скорости вполне сравним с Redis. Нормально (не просто) кластеризуется, строятся целые иерархии MQ-серверов. Для системы, где ходят сотни миллионов сообщений в день – сойдёт.

Очередь или всё же журнал сообщений на Kafka

Ещё один вариант доставки сообщения до воркера – Apache Kafka. Но это, опять же, не очередь, а журнал сообщений (commit log). Иными словами, Kafka вообще никак не помогает в поддержании статуса сообщения (доставлено/не доставлено).

Представьте кучу подряд идущих сообщений. В конец сообщения дописываются, из начала удаляются (по старости). Клиент же хранит "указатель" на какое-то место в этой последовательности, по которому читает какое-то количество сообщений, передвигает дальше, снова читает.

Это краткое описание работы с одной партицией – с одним конкурирующим клиентом. Если хотите конкуренцию нескольких клиентов (воркеров) – сделайте несколько этих самых партиций, на каждую посадите по клиенту, а продюсер пусть раскидывает сообщения, например, поочерёдно по партициям.

Отличий от работы с очередями при использовании готовых библиотек особо не будет, кроме того, что можно "заглянуть" в прошлое. Ну и обработка ошибок осуществляется приложением, а не Kafka.

Скорость высокая, масштабируемость отличная.

Проблемы обработки задач с помощью очередей

Всё это, конечно, хорошо, но, на самом деле, нет. Есть множество ограничений у каждого решения. И это даже не вопросы производительности, а вопрос соблюдения определённых гарантий.

Гарантия доставки сообщений

Представьте, что вы отправили сообщение. Сколько раз оно будет принято? Казалось бы, в сферическом вакууме количество отправок должно совпасть с количеством получений... Но вспомните хотя бы ту же "Почту" – вечно что-то теряется. Так и с распределёнными системами – где-то сеть моргнула, где-то сервис упал...

Так что некоторые сервера очередей, протоколы передачи данных реализуют подходы "at-least-once" (доставлено 1 или более раз), "at-most-once" (1 или менее раз). Но, к примеру, при передаче сообщения о списании средств с банковского счёта, очень не хотелось бы попадать на случаи, когда не ровно 1 раз ("exactly-once") было доставлено сообщение.

Сами же подходы at-least-once, at-most-once довольно просты: шлём сообщение, ждём подтверждения доставки / не ждём подтверждения.

Если ждём – такая ситуация всё ломает: сообщение было принято, обработано, но отправить "ack" не смогло... Значит надо перепослать – и уже 2 доставки вместо одной.

Если не ждём – ещё очевидней: все получатели лежат, никто не получил...

Что же нужно делать для "exactly-once"? Добавить адрес отправителя и порядковый номер сообщения в заголовок. После чего работать равно как и при "at-least-once".

Единственное отличие – на клиенте проверять, не было ли ещё от этого отправителя сообщения с таким порядковым номером. Так с давних времён поступает TCP для транспорта потоков данных по сети. Так поступили и в той же Kafka.

Но тут есть опять же проблема – маршрут сообщений (отправитель - получатель) не должен меняться во время потока передачи данных.

Обработка ошибок доставки и клиента

Как мы уже договорились ранее, сеть моргает, сервисы падают. Как нам жить в такой неспокойной среде? Даже не так. Как сервер очередей может понять, что что-то пошло не так?

Можно, конечно, прикрутить к очередям ещё и возможность лазить по воркерам, смотреть dmesg, в целом интегрировать системы мониторинга. Но делов...

Пожалуй, самые распространённые подходы к определению, что "что-то пошло не так" – это TTL и MAX_RETRIES. То есть, если за определённое время так и не пришёл ответ об успешной обработке сообщения – пытаемся какое-то количество раз перепослать это сообщение. Если превышено максимальное количество повторных отправок – всё плохо.

Ну а что делать, если мы определили, что "плохо" – я выше описывал в списке реализаций очередей. То есть, помещаем в отдельную очередь ошибок, чтобы разобраться в причинах и перепослать всё по-новой.

Важно не забыть проверить, что система настроена так, чтобы "плохие" сообщение не просто выкидывались, а попадали в какой-то "dead message queue".

Порядок доставки сообщений, порядок доставки результата

Ещё одна проблема, которая встаёт при организации систем доставки сообщений – их порядок. К примеру, если вы пишете чат-комнату, то было бы неплохо, чтобы клиенты получили сообщения в порядке их написания.

Как раз одна из причин, по которой для доставки сообщений используются очереди – это их принцип "FIFO" (первый вошедший выйдет первым), что соответствует житейскому пониманию справедливой очереди.

В простой модели, где очередь одна и получатель один (воркер) – всё просто.

Теперь представим, что сообщения мы получаем почему-то из нескольких очередей. Будет ли доставка последовательна? Кто позаботился о том, чтобы сообщения были пронумерованы в порядке появления их в этом мире?

Теперь про доставку результата. Есть одна очередь, её конкурентно обрабатывают несколько воркеров. С одной ли скоростью они обрабатывают каждое сообщение? Когда они отдадут результат куда-то – будет ли порядок ответов соответствовать порядку запросов?

И тут надо понять – важно ли это. И если да – то придётся ставить ещё несколько ограничений и звено в цепи обработки, которое будет буферизировать и перетасовывать при необходимости. Либо же это как-то заложить в протокол обмена сообщениями на уровне логики приложения.

На самом деле, существует куда больше проблем, да и технических решений. Сюда и ACID транзакций, и CAP в целом для распределённых систем, и прочие занимательные проблемы. Хотелось базово описать, как живёт асинхронность на уровне межсистемного взаимодействия, за счёт чего, и какие неочевидные проблемы там можно найти. Надеюсь, это кому-то поможет.