Брокеры сообщений что это
Брокеры сообщений что это
Apache Kafka – не единственный программный брокер сообщений и система управления очередями, используемая в высоконагруженных Big Data проектах. Кафка часто сравнивают с другим популярным продуктом аналогичного назначения – RabbitMQ. В сегодняшней статье мы рассмотрим, чем похожи и чем отличаются Apache Kafka и RabbitMQ, а также поговорим о том, что следует выбирать в конкретных случаях для практического применения.
Что такое RabbitMQ и как он работает
RabbitMQ – это программный брокер сообщений на основе стандарта AMQP, написанный на языке Erlang и состоящий из следующих основных компонентов [1]:
RabbitMQ поддерживает несколько языков программирования (Perl, Python, Ruby, PHP), а также обеспечивает горизонтальное масштабирование для построения кластерных решений [1]. Поэтому RabbitMQ, который неформально называют «Кролик», довольно часто применяется в различных Big Data проектах. Однако, в связи с некоторыми его технологическими особенностями реализации, он не является полноценной заменой Apache Kafka.
В упрощенном виде управление сообщениями выполняется в RabbitMQ следующим образом [3]:
Сходства Apache Kafka и RabbitMQ
Несмотря на принципиальные отличия этих систем обмена сообщениями, между ними довольно много общих моментов [4]:
Чем Кафка отличается от Кролика
Основные отличия Apache Kafka и RabbitMQ обусловлены принципиально разными моделями доставки сообщений, реализуемыми в этих системах. В частности, Kafka действует по принципу вытягивания (pull), когда получатели (consumers) сами достают из топика (topic) нужные им сообщения. RabbitMQ, напротив, реализует модель проталкивания, отправляя необходимые сообщения получателям. В связи с этим Кафка отличается от Кролика по следующим критериям [3]:
Кафка или Кролик: что выбирать для Big Data проекта
Обратной стороной широких и разнообразных возможностей RabbitMQ по гибкому управлению очередями сообщений (маршрутизация, шаблоны доставки, мониторинг получения) является повышенное потребление ресурсов и, соответственно, снижение производительности в условиях увеличенных нагрузок. А, поскольку именно такой режим работы характерен для Big Data систем, то в большинстве случаев Apache Kafka является наилучшим средством для управления сообщениями. Например, в случае сбора и агрегации множества событий от IoT-устройств, клиентских метрик, лог-файлов и аналитики Big Data с перспективой увеличения источников информации понадобится Кафка. А если необходим быстрый сообщениями между несколькими сервисами, RabbitMQ отлично справится с этой задачей [5]. Кролик можно использовать для обработки событий в режиме реального времени, т.е. этот брокер – решение только для реагирования на события, которые происходят сейчас. Кафка, напротив, обеспечивает полную историческую достоверность и сохранность всех данных, а также упрощает их распространение. Исходные данные принадлежат только отправителю, но каждый получатель может их фильтровать, трансформировать, дополнять данными из других источников и сохранять в собственных базах данных [6].
Подводя итог сравнению Apache Kafka и RabbitMQ, можно сделать вывод, что выбор того или иного брокера в первую очередь зависит от нагрузки, в которой предполагается его использование. В случае адекватного применения каждая из этих систем обмена сообщениями будет эффективным инструментом реализации Big Data проекта.
Краткое сравнение брокеров сообщений Apache Kafka и RabbitMQ
О другой популярной альтернативе Kafka, Apache Pulsar, читайте здесь. А еще больше прикладных сведений по Кафка и других технологиях больших данных на практических курсах в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов, для руководителей, архитекторов, инженеров, администраторов, аналитиков Big Data и Data Scientist’ов в Москве:
Kafka, RabbitMQ или AWS SNS/SQS: какой брокер выбрать?
Четкая работа микросервисных приложений в значительной степени зависит от передачи сообщений и асинхронных операций.
Правильный выбор брокера сообщений — это одно из первых важных решений, которое потребуется принять при разработке взаимодействующих сервисов. Поиск «правильного» решения может превратиться в мучительное сравнение функций и пограничных вариантов, которые мало отличаются друг от друга.
В этой статье я бы хотел немного прояснить ситуацию и рассказать о нескольких наиболее известных брокерах сообщений. Я рассмотрю задачи, под которые они разрабатывались, используемую в них общую модель обмена сообщениями и постараюсь помочь с выбором оптимального варианта.
Apache Kafka
Kafka — это брокер сообщений с открытым исходным кодом, который был разработан и сейчас поддерживается в первую очередь фондом Apache Software Foundation при содействии сообщества разработчиков приложений с открытым кодом.
Основные характеристики
Акцент на потоковом контенте, работа с большими потоками данных.
Основные возможности: обеспечение сохранности сообщений и их многократная повторная обработка.
Хостинг на месте и поддержка сторонних модулей.
Kafka обеспечивает оптимизированную потоковую обработку событий, при этом связь между потребителями реализуется по модели «Публикации — подписки». События могут быть разбиты на топики — эта возможность позволяет лучше организовать схемы взаимодействия распределенного приложения. Кроме того, события делятся между несколькими серверами в пределах одного кластера — благодаря этому достигается высокая устойчивость и производительность системы.
Технические особенности развертывания
Apache предлагает SDK на нескольких языках.
Kafka предназначен для развертывания на месте в рамках вашей собственной архитектуры. Это может быть группа отдельных серверов, виртуальная машина или Docker-контейнер.
Многие компании предлагают хостинг Kafka в качестве услуги (например, AWS, CloudKarafka и Aiven) или на их виртуальных машинах.
Ниже приведен пример кода на JavaScript для начала работы с событиями Apache Kafka.
Преимущества и недостатки
Kafka ориентирован на высокую пропускную способность потока данных, это видно по его статистике производительности.
Kafka заточен на обработку потоков данных, поэтому получается высокопроизводительная система, которая выполняет сложную обработку больших объемов данных. Функционал маршрутизации этих потоков относительно ограничен по сравнению с другими брокерами сообщений. Однако этот разрыв постоянно уменьшается по мере совершенствования продуктов.
Таким образом, Kafka представляет собой надежный высокопроизводительный отказоустойчивый продукт для потоковой обработки сообщений, который дает вам полный контроль над поведением приложения. В зависимости от доступной пропускной способности канала и ресурсов можно выбрать комфортную для вас степень абстракции хостинга, и Kafka будет надежным вариантом, который способен масштабироваться вместе с вашим трафиком.
RabbitMQ
RabbitMQ — это еще один брокер сообщений с открытым кодом. Первоначальным разработчиком была компания Rabbit Technologies, но в результате ряда приобретений продукт перешел в собственность VMware.
Основные характеристики
Акцент на обмене сообщениями с возможностью поддержки больших потоков данных.
Основная особенность — расширенный функционал маршрутизации.
Хостинг на месте и поддержка сторонних модулей.
RabbitMQ также использует модель «Публикации — подписки», отправляя объекты сообщений в двоичной форме в различные именованные очереди, которые могут динамически создаваться и уничтожаться.
Он может работать как автономно, так и в составе кластера, позволяет наращивать достаточную мощность для достижения любой степени избыточности или безопасности данных.
Технические особенности развертывания
Для RabbitMQ существует несколько клиентских библиотек на множестве языков.
Он может быть развернут на месте: на полноценном сервере, в контейнере или на одном из нескольких облачных хостингов.
Следующий код на Node.js с пакетом AMQPLIB иллюстрирует простейший пример работы с RabbitMQ.
Преимущества и недостатки
RabbitMQ способен справляться практически с любыми нагрузками и может эффективно масштабироваться вместе с вашим приложением по мере роста базы пользователей.
Благодаря акценту на доставке сообщений и сложных сценариях маршрутизации RabbitMQ легко адаптируется к любой архитектуре приложений. Хотя изначально обработка потоков данных не поддерживалась и сообщения, как правило, обрабатывались однократно без возможности повторной обработки, эти слабые стороны были устранены по мере развития RabbitMQ.
RabbitMQ может выполнять любую подходящую роль в инфраструктуре вашего приложения благодаря тому, что позволяет контролировать все необходимое и передавать остальное на сторону.
Amazon Web Services (AWS) SQS/SNS
SNS и SQS представляют собой примеры двух разных подходов к распределенному обмену сообщениями.
SNS в значительной степени ориентирован на доставку сообщений. С помощью модели «Публикации — подписки» он позволяет быстро передавать сообщения множеству клиентов, например мобильным устройствам, конечным точкам HTTPS или другим сервисам AWS.
SQS, напротив, приоритетом ставит успешную доставку и обработку сообщений отдельными клиентами.
Основные характеристики
Возможность передавать широковещательные сообщения и работать по модели «Публикации — подписки».
Быстрая настройка с помощью AWS.
Отсутствие хостинга за пределами AWS.
SNS транслирует одно и то же сообщение множеству получателей, а SQS распределяет организованные в очередь сообщения среди отдельных подписчиков.
В SNS применяется режим push-уведомлений, который позволяет автоматизировать ответы. SQS больше ориентирован на механизм опроса с поддержкой некоторых дополнительных функций, управляемых событиями.
Технические особенности
AWS предлагает общий SDK с доступом к большинству сервисов AWS (включая SQS и SNS) на нескольких популярных языках.
Ниже приведен код, который демонстрирует работу с сервисами SNS и SQS при помощи SDK AWS.
Преимущества и недостатки
При совместном использовании AWS SQS и SNS могут стать основой масштабируемого надежного распределенного приложения. Благодаря интеграции со множеством других сервисов AWS (например, Lambda) эти два инструмента могут помочь с легкостью расширить коммуникационные возможности вашего приложения и предоставить обширный инструментарий для разрешения проблем взаимодействия между сервисами.
Если ваше веб-приложение уже работает в инфраструктуре AWS, на настройку фактически не придется тратить время и сложностей будет существенно меньше, чем у множества других систем. Потенциально это удобство обернется счетом за услуги AWS, который будет расти по мере увеличения количества сообщений.
В отличие от Kafka и RabbitMQ, которые не ограничивают размер сообщений по умолчанию, AWS устанавливает некоторые ограничения для сообщений SQS и SNS и после достижения определенного размера преобразует сообщения в объекты S3.
Мы публиковали подробную статью о том, как можно преодолеть это ограничение по размеру. Рекомендую ознакомиться с ней, чтобы понять, как SQS работает с большими сообщениями.
https://www.aspecto.io/blog/how-to-send-large-sqs-sns-messages-with-node-js/
Так как SQS и SNS построены в соответствии с принципом Cloud First, дополнительная сложность их использования вызвана привязкой к конкретному провайдеру. Другие брокеры сообщений избавлены от этого благодаря возможности локальной установки и сопровождения.
Выбор подходящего брокера сообщений
При выборе брокера вам следует принять во внимание два фактора:
Фактор 1: тип отправляемых сообщений
Первым делом при выборе брокера нужно определить, какие сообщения вы будете отправлять и в каком формате. Именно особенности ваших сообщений определят, на что следует обращать внимание при оценке возможностей каждой платформы. Платформы будут приблизительно похожи по своему функционалу — в том смысле, что каждый из продуктов имеет все необходимое для поддержки масштабируемого распределенного приложения. С точки зрения чисто функциональных возможностей все решения хороши.
Фактор 2: характер вашей ежедневной деятельности и инфраструктура приложений
Тут свою роль могут сыграть второстепенные обстоятельства. Подумайте о своей ежедневной работе и своих системах и задайте себе следующие вопросы:
Вы создаете приложение только в AWS? Возможно, для взаимодействия между службами будет целесообразнее использовать SQS и SNS.
Вам больше нравится работать над своим приложением, чем заниматься вопросами передачи данных между его компонентами? Тогда сторонний брокер может быть оптимальным вариантом, который позволит сосредоточиться на ваших сильных сторонах и нарастить вашу базу кода.
Вам требуются скорость доставки и минимальная задержка? Тогда Kinesis — то, что вам нужно (рассмотрим его в другой статье, так что следите за обновлениями). При этом для приложения, которое ориентировано на гарантированную доставку и избыточность, может потребоваться другая технология.
На этом уровне ваш выбор зависит от требований инфраструктуры приложения и характера работы.
С учетом вышесказанного и с оговоркой, что свести эти сложные технологические системы к рекомендациям в несколько строк — трудно и несколько несправедливо, дам некоторые советы по правильному выбору брокера сообщений.
Если для вас важны сохранность сообщений и возможность многократной повторной обработки — ваш выбор должен пасть на Kafka.
Если вас больше волнует возможность поддерживать и внедрять сложный набор правил маршрутизации — вам лучше всего подойдет RabbitMQ.
Если у вас небольшой стартап и вы хотите быстрее приступить к работе с минимальными накладными расходами — для вас отличным вариантом станут AWS SQS и SNS, учитывая их быструю настройку и структуру расходов.
Сквозная видимость в процессе обмена сообщениями
Один из аспектов, который следует оценить, — это оптимальный способ сопровождения конечного продукта. Как определить место возникновения ошибки после того, как приложение начнет отправлять сообщения и произойдет сбой?
OpenTelemetry представляет собой набор SDK и инструментов для обеспечения наблюдаемости распределенного приложения и позволяет устранять неполадки в распределенном обмене сообщениями. Краткое пошаговое руководство содержит инструкции по внедрению OpenTelemetry в распределенные приложения для сквозной видимости передаваемых сообщений. В примере в качестве брокера сообщений используется Kafka.
По ссылке далее руководства можно загрузить инструмент OpenTelemetry Kafkajs Instrumentation для Node.js
https://www.aspecto.io/blog/how-to-achieve-end-to-end-microservices-visibility-in-asyn-messaging-with-opentelemetry/
Выводы
Если вы создаете приложение, которое имеет тот или иной «распределенный» аспект, то велики шансы, что вам в какой-то момент потребуется обрабатывать асинхронное взаимодействие между его компонентами.
Сообщения и брокеры, которые их доставляют, будут критически важны в инфраструктуре, которая управляет вашим приложением.
Этот обзор не является исчерпывающим — мне потребовалась бы еще тысяча слов, чтобы начать полноценный рассказ о брокерах сообщений, но надеюсь, обзор содержит полезную информацию, которая пригодится вам при принятии решения. Главное — до конца понимать потребности вашего приложения и то, как эти потребности соответствуют возможностям брокеров сообщений, которые вы рассматриваете. В конечном счете вопрос выбора брокера сообщений не имеет «неправильного» ответа, но хочется верить, что эта статья поможет вам сделать шаг в правильном направлении.
Связанные репозитории GitHub
Библиотека производителей и потребителей SQS/SNS. Дает возможность передавать полезные нагрузки через S3.
Асинхронное взаимодействие. Брокеры сообщений. Apache Kafka
Данная публикация предназначена для тех, кто интересуется устройством распределенных систем, брокерами сообщений и Apache Kafka. Здесь вы не найдете эксклюзивного материала или лайфхаков, задача этой статьи – заложить фундамент и рассказать о внутреннем устройстве упомянутого брокера. Таким образом, в следующих публикациях мы сможем делать ссылки на данную статью, рассказывая о более узкоспециализированных темах.
Привет! Меня зовут Дмитрий Шеламов и я работаю в Vivid.Money на должности backend-разработчика в отделе Customer Care. Наша компания – европейский стартап, который создает и развивает сервис интернет-банкинга для стран Европы. Это амбициозная задача, а значит и ее техническая реализация требует продуманной инфраструктуры, способной выдерживать высокие нагрузки и масштабироваться согласно требованиям бизнеса.
В основе проекта лежит микросервисная архитектура, которая включает в себя десятки сервисов на разных языках. В их числе Scala, Java, Kotlin, Python и Go. На последнем я пишу код, поэтому практические примеры, приведенные в этой серии статей, будут задействовать по большей части Go (и немного docker-compose).
Работа с микросервисами имеет свои особенности, одна из которых – организация коммуникаций между сервисами. Модель взаимодействия в этих коммуникациях бывает синхронной или асинхронной и может оказать существенное влияние на производительность и отказоустойчивость системы в целом.
Асинхронное взаимодействие
Итак, представим что у нас есть два микросервиса (А и Б). Будем считать, что коммуникация между ними осуществляется через API и они ничего не знают о внутренней реализации друг друга, как и предписывает микросервисный подход. Формат передаваемых между ними данных заранее оговорен.
Задача перед нами стоит следующая: нам нужно организовать передачу данных от одного приложения к другому и, желательно, с минимальными задержками.
В самом простом случае поставленная задача достигается синхронным взаимодействием, когда А отправляет приложению Б запрос, после чего сервис Б его обрабатывает и, в зависимости от того, успешно или не успешно был обработан запрос, отправляет некоторый ответ сервису А, который этот ответ ожидает.
Если же ответ на запрос так и не был получен (например, Б рвет соединение до отправки ответа или А отваливается по таймауту), сервис А может повторить свой запрос к Б.
С одной стороны, такая модель взаимодействия дает определенность статуса доставки данных для каждого запроса, когда отправитель точно знает, были ли получены данные получателем и какие дальнейшие действия ему необходимо делать в зависимости от ответа.
С другой стороны, плата за это – ожидание. После отправки запроса сервис А (или поток, в котором выполняется запрос) блокируется до того момента, пока не получит ответ или не сочтет запрос неудавшимся согласно своей внутренней логике, после чего примет дальнейшие действия.
Проблема не только в том, что ожидание и простой имеют место быть, – задержки в сетевом взаимодействии неизбежны. Основная проблема заключается в непредсказуемости этой задержки. Участники коммуникации в микросервисном подходе не знают подробностей реализации друг друга, поэтому для запрашивающей стороны не всегда очевидно, обрабатывается ли ее запрос штатно или нужно переотправить данные.
Все, что остается А при такой модели взаимодействия – это просто ждать. Может быть наносекунду, а может быть час. И эта цифра вполне реальна в том случае, если Б в процессе обработки данных выполняет какие-либо тяжеловесные операции, вроде обработки видео.
Возможно, вам проблема не показалась существенной – одна железка ждет пока другая ответит, велика ли потеря?
Чтобы сделать эту проблему более личной, представим, что сервис А – это приложение, запущенное на вашем телефоне, и пока оно ожидает ответ от Б, вы видите на экране анимацию загрузки. Вы не можете продолжить пользоваться приложением до тех пор, пока сервис Б не ответит, и вынуждены ждать. Неизвестное количество времени. При том, что ваше время гораздо ценнее, чем время работы куска кода.
Подобные шероховатости решаются следующим образом – вы разделяете участников взаимодействия на два “лагеря”: одни не могут работать быстрее, как бы вы их ни оптимизировали (обработка видео), а другие не могут ждать дольше определенного времени (интерфейс приложения на вашем телефоне).
Затем вы заменяете cинхронное взаимодействие между ними (когда одна часть вынуждена ждать другую, чтобы удостовериться, что данные были доставлены и обработаны сервисом-получателем) на асинхронное, то есть модель работы по принципу отправил и забыл – в этом случае сервис А продолжит свою работу, не дожидаясь ответа от Б.
Но как в этом случае гарантировать то, что передача прошла успешно? Вы же не можете, допустим, загрузив видео на видеохостинг, вывести пользователю сообщение: «ваше видео может быть обрабатывается, а может быть и нет», потому что сервис, занимающийся загрузкой видео, не получил от сервиса-обработчика подтверждение, что видео дошло до него без происшествий.
В качестве одного из решений данной проблемы мы можем добавить между сервисами А и Б прослойку, которая будет выступать временным хранилищем и гарантом доставки данных в удобном для отправителя и получателя темпе. Таким образом мы сможем расцепить сервисы, синхронное взаимодействие которых потенциально может быть проблемным:
Однако выбор СУБД в качестве инструмента для обмена данными может привести к проблемам с производительностью с ростом нагрузки. Причина в том, что большинство баз данных не предназначены для такого сценария использования. Также во многих СУБД отсутствует возможность разделения подключенных клиентов на получателей и отправителей (Pub/Sub) – в этом случае, логика доставки данных должна быть реализована на клиентской стороне.
Вероятно, нам нужно нечто более узкоспециализированное, чем база данных.
Брокеры сообщений
Брокер сообщений (очередь сообщений) – это отдельный сервис, который отвечает за хранение и доставку данных от сервисов-отправителей к сервисам-получателям с помощью модели Pub/Sub.
Эта модель предполагает, что асинхронное взаимодействие осуществляется согласно следующей логике двух ролей:
Очередь можно представить как канал связи, натянутый между писателем и читателем. Писатели кладут сообщения в очередь, после чего они “проталкиваются” (push) читателям, которые подписаны на эту очередь. Один читатель получает одно сообщение за раз, после чего оно становится недоступно другим читателям.
Под сообщением же подразумевается единица данных, обычно состоящая из тела сообщения и метаданных брокера.
В общем случае, тело представляет из себя набор байт определенного формата.
Получатель обязательно должен знать этот формат, чтобы после получения сообщения иметь возможность десериализовать его тело для дальнейшей обработки.
Использовать можно любой удобный формат, однако, важно помнить об обратной совместимости, которую поддерживают, например, бинарный Protobuf и фреймворк Apache Avro.
По такому принципу работает большинство брокеров сообщений, построенных на AMQP (Advanced Message Queuing Protocol) – протоколе, который описывает стандарт отказоустойчивого обмена сообщениями посредством очередей.
Данный подход обеспечивает нам несколько важных преимуществ:
At least once, напротив, гарантирует получение сообщения получателем, однако при этом есть вероятность повторной обработки одних и тех же сообщений.
Зачастую эта гарантия достигается с помощью механизма Ack/Nack (acknowledgement/negative acknowledgement), который предписывает совершать переотправку сообщения, если получатель по какой-то причине не смог его обработать.
Таким образом, для каждого отправленного брокером (но еще не обработанного) сообщения существует три итоговых состояния — получатель вернул Ack (успешная обработка), вернул Nack (неуспешная обработка) или разорвал соединение. Последние два сценария приводят в переотправке сообщения и повторной обработке.
Однако брокер может произвести повторную отправку и при успешной обработке сообщения получателем. Например, если получатель обработал сообщение, но завершил свою работу, не отправив сигнал Ack брокеру.
В этом случае брокер снова положит сообщение в очередь, после чего оно будет обработано повторно, что может привести к ошибкам и порче данных, если разработчик не предусмотрел механизм устранения дублей на стороне получателя.
Стоит отметить, что существует еще одна гарантия доставки, которая называется “exactly once”. Ее трудно достичь в распределенных системах, но при этом она же является наиболее желаемой.
В этом плане, Apache Kafka, о которой мы будем говорить далее, выгодно выделяется на фоне многих доступных на рынке решений. Начиная с версии 0.11, Kafka предоставляет гарантию доставки exactly once в пределах кластера и транзакций, в то время как AMQP-брокеры таких гарантий предоставить не могут. Транзакции в Кафке – тема для отдельной публикации, сегодня же мы начнем со знакомства с Apache Kafka.
Apache Kafka
Мне кажется, что будет полезно для понимания начать рассказ о Кафке со схематичного изображения устройства кластера.
Отдельный сервер Кафки именуется брокером. Брокеры образуют собой кластер, в котором один из этих брокеров выступает контроллером, берущим на себя некоторые административные операции (помечен фиолетовым).
За выбор брокера-контроллера, в свою очередь, отвечает отдельный сервис – ZooKeeper, который также осуществляет service discovery брокеров, хранит конфигурации и принимает участие в распределении новых читателей по брокерам и в большинстве случаев хранит информацию о последнем прочитанном сообщении для каждого из читателей. Это важный момент, изучение которого требует опуститься на уровень ниже и рассмотреть, как отдельный брокер устроен внутри.
Commit log
Структура данных, лежащая в основе Kafka, называется commit log или журнал фиксации изменений.
Новые элементы, добавляемые в commit log, помещаются строго в конец, и их порядок после этого не меняется, благодаря чему в каждом отдельном журнале элементы всегда расположены в порядке их добавления.
Свойство упорядоченности журнала фиксаций позволяет использовать его, например, для репликации по принципу eventual consistency между репликами БД: в них хранят журнал изменений, производимых над данными в мастер-ноде, последовательное применение которых на слейв-нодах позволяет привести данные в них к согласованному с мастером виду.
В Кафке эти журналы называются партициями, а данные, хранимые в них, называются сообщениями.
Что такое сообщение? Это основная единица данных в Kafka, представляющая из себя просто набор байт, в котором вы можете передавать произвольную информацию – ее содержимое и структура не имеют значения для Kafka. Сообщение может содержать в себе ключ, так же представляющий из себя набор байт. Ключ позволяет получить больше контроля над механизмом распределения сообщений по партициям.
Партиции и топики
Почему это может быть важно? Дело в том, что партиция не является аналогом очереди в Кафке, как может показаться на первый взгляд. Я напомню, что формально очередь сообщений – это средство для группирования и управления потоками сообщений, позволяющее определенным читателям подписываться только на определенные потоки данных.
Так вот в Кафке функцию очереди выполняет не партиция, а topic. Он нужен для объединения нескольких партиций в общий поток. Сами же партиции, как мы сказали ранее, хранят сообщения в упорядоченном виде согласно структуре данных commit log. Таким образом, сообщение, относящееся к одному топику, может хранится в двух разных партициях, из которых читатели могут вытаскивать их по запросу.
Следовательно, единицей параллелизма в Кафке выступает не топик (или очередь в AMQP брокерах), а партиция. За счет этого Кафка может обрабатывать разные сообщения, относящиеся к одному топику, на нескольких брокерах одновременно, а также реплицировать не весь топик целиком, а только отдельные партиции, предоставляя дополнительную гибкость и возможности для масштабирования в сравнении с AMQP брокерами.
Pull и Push
Обратите внимание, что я не случайно использовал слово “вытаскивает” по отношению к читателю.
В описанных ранее брокерах доставка сообщений осуществляется путем их проталкивания (push) получателям через условную трубу в виде очереди.
В Кафке процесса доставки как такового нет: каждый читатель сам ответственен за вытягивание (pull) сообщений из партиций, которые он читает.
Производители, формируя сообщения, прикрепляют к нему ключ и номер партиции. Номер партиции может быть выбран рандомно (round-robin), если у сообщения отсутствует ключ.
Если вам нужен больший контроль, к сообщению можно прикрепить ключ, а затем использовать hash-функцию или написать свой алгоритм, по которому будет выбираться партиция для сообщения. После формирования, производитель отправляет сообщение в Кафку, которая сохраняет его на диск, помечая, к какой партиции оно относится.
Каждый получатель закреплен за определенной партицией (или за несколькими партициями) в интересующем его топике, и при появлении нового сообщения получает сигнал на вычитывание следующего элемента в commit log, при этом отмечая, какое последнее сообщение он прочитал. Таким образом при переподключении он будет знать, какое сообщение ему вычитать следующим.
Какие преимущества имеет данный подход?
Недостатки
К недостаткам данного подхода можно отнести работу с проблемными сообщениями. В отличие от классических брокеров, битые сообщения (которые не удается обработать с учетом существующей логики получателя или из-за проблем с десериализацей) нельзя бесконечно перезакидывать в очередь, пока получатель не научится их корректно обрабатывать.
В Кафке по умолчанию вычитывание сообщений из партиции останавливается, когда получатель доходит до битого сообщения, и до тех пор, пока оно не будет пропущено и закинуто в “карантинную” очередь (также именуемой “dead letter queue”) для последующей обработки, чтение партиции продолжить не получится.
Также в Кафке сложнее (в сравнении с AMQP-брокерами) реализовать приоритет сообщений. Это напрямую вытекает из того факта, что сообщения в партициях хранятся и читаются строго в порядке их добавления. Один из способов обойти данное ограничение в Кафке – создать нескольких топиков под сообщения с разным приоритетом (отличаться топики будут только названием), например, events_low, events_medium, events_high, а затем реализовать логику приоритетного чтения перечисленных топиков на стороне приложения-консьюмера.
Еще один недостаток данного подхода связан тем, что необходимо вести учет последнего прочитанного сообщения в партиции каждым из читателей. В силу простоты структуры партиций, эта информация представлена в виде целочисленного значения, именуемого offset (смещение). Оффсет позволяет определить, какое сообщение в данный момент читает каждый из читателей. Ближайшая аналогия оффсета — это индекс элемента в массиве, а процесс чтения похож на проход по массиву в цикле с использованием итератора в качестве индекса элемента.
Однако этот недостаток нивелируется за счет того, что Kafka, начиная с версии 0.9, хранит оффсеты по каждому пользователю в специальном топике __consumer_offsets (до версии 0.9 оффсеты хранились в ZooKeeper).
К тому же, вести учет оффсетов можно непосредственно на стороне получателей.
Также усложняется и масштабирование: напомню, что в AMQP брокерах для того, чтобы ускорить обработку потока сообщений, нужно просто добавить несколько экземпляров сервиса-читателя и подписать их на одну очередь, при этом не требуется вносить никаких изменений в конфигурации самого брокера.
Однако в Кафке масштабирование происходит несколько сложнее, чем в AMQP брокерах. Например, если вы добавите еще один экземпляр читателя и натравите его на ту же партицию, вы получите нулевой КПД, так как в этом случае оба экземпляра будут читать один и тот же набор данных.
Поэтому базовое правило масштабирования Кафки — количество конкурентных читателей (то бишь группа сервисов, реализующих одинаковую логику обработки (реплик)) топика не должно превышать количество партиций в этом топике, иначе какая-то пара читателей будут обрабатывать одинаковый набор данных.
Consumer Group
Чтобы избежать ситуации с чтением одной партиции конкурентными читателями, в Кафке принято объединять несколько реплик одного сервиса в consumer Group, в рамках которого Zookeeper будет назначать одной партиции не более одного читателя.
Так как читатели привязываются непосредственно к партиции (при этом читатель обычно ничего не знает о количестве партиций в топике), ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, чтобы каждая партиция имела одного и только одного читателя.
Читатель обозначает свою Consumer Group при подключении к Kafka.
Но здесь мы можем столкнуться с другой проблемой, порожденной тем, что Кафка использует структуру из топиков и партиций. Я напомню, что Кафка не гарантирует упорядоченность сообщений в рамках топика, только в рамках партиции, что может оказаться критичным, например, при формировании отчетов о действиях по пользователю и отправке их в хранилище as is.
Чтобы решить эту проблему, мы можем пойти от обратного: если все события, относящиеся к одной сущности (например, все действия относящиеся к одному user_id), будут всегда добавляться в одну и ту же партицию, они будут упорядочены в рамках топика просто потому, что находятся в одной партиции, порядок внутри которой гарантирован Кафкой.
Для этого нам и нужен ключ у сообщений: например, если мы будем использовать для выбора партиции, в которую будет добавлено сообщение, алгоритм, вычисляющий хэш от ключа, то сообщения с одинаковым ключом будут гарантированно попадать в одну партицию, а значит и вытаскивать получатель сообщения с одинаковым ключом в порядке их добавления в топик.
В кейсе с потоком событий о действиях пользователей ключом партицирования может выступать user_id.
Retention Policy
Теперь пришло время поговорить о Retention Policy.
Это настройка, которая отвечает за удаление сообщений с диска при превышении пороговых значений даты добавления (Time Based Retention Policy) или занимаемого на диске пространства (Size Based Retention Policy).
Compaction Policy
Еще одним способом оптимизации объема, занимаемого на диске, может быть использование Compaction Policy – эта настройка позволяет хранить только последнее сообщение по каждому ключу, удаляя все предыдущие сообщения. Это может быть полезно, когда нас интересует только последнее изменение.