Практичное и интересное приложение обработки событий

Это сообщение в блоге немного устарело; С тех пор была добавлена ​​еще одна служба наблюдения, так что существует отдельная служба, которая сопоставляет пользователей с «персонализированными предупреждениями о сделках», и другая, которая обрабатывает события, связанные с этими парами, и создает уведомления. Сообщение будет скоро обновлено, чтобы отразить эти изменения.

Наша компания, помимо прочего, создает и управляет платформой для поиска, исследования и сравнения продуктов BestPrice.gr. (с тех пор он был выделен в отдельное юридическое лицо). Есть несколько забавных проблем, связанных с созданием и запуском таких сервисов электронной коммерции, которые охватывают IR (поиск, рекомендации и т. Д.), Обработку больших данных (машинное обучение, вероятностные модели и т. Д.), Взаимодействие с пользователем и т. Д.

Я подумал, что было бы полезно описать, как работает одна из множества различных функций / функций, которые мы предоставляем, потому что это подчеркивает, как разделение и потоки могут помочь организациям создавать приложения и быстро масштабироваться с небольшими усилиями.

Пользователи могут подписаться на получение уведомлений о продуктах на BestPrice, которые классифицируются как «сделки». Они выбирают, какие категории товаров им интересны (видеоигры, мобильные телефоны, одежда, обувь, книги и т. Д.), Минимальную нижнюю границу цены в процентах (т. Е. Если цена продукта упадет на 20% или более) и как получать уведомления об этих событиях (по электронной почте, через Slack и через push-уведомления Google Chrome). Это довольно полезная функция, если можно так выразиться, и она очень популярна среди наших пользователей.

Мы сотрудничаем с почти 1700 продавцами, и мы периодически (в зависимости от конфигурации каждого продавца и других бизнес-правил, частота варьируется от 15 минут до 2 часов, с различными исключениями) получаем от них фиды продуктов через систему, специально созданную для загрузки фидов. одновременно и хранить их локально для последующей обработки. Примерно каждые 15 минут запускается другая служба, которая проверяет наличие доступных обновлений, обрабатывает их, объединяет обновления с текущими наборами данных, обновляет индексы Trinity и многое другое. Он также генерирует множество событий, среди прочего, для всех продуктов, в которых мы обнаружили изменения в их свойствах (цене, доступности и т. Д.) С момента последнего сеанса индексации. Мы используем ТАНК для хранения событий в разделах, организованных по темам.

Всякий раз, когда события об изменении свойств продукта производятся в определенной теме TANK, многие (десятки) различных сервисов потребляют эти обновления и действуют в соответствии с ними самыми разными и полезными способами. Это также верно и для других типов мероприятий по другим темам. У нас может быть около 100 различных сервисов, каждый из которых отслеживает, как правило, 1, но часто до 10 различных тем для обновлений, и эти сервисы, в свою очередь, могут создавать (генерировать) больше событий, которые позже будут обрабатываться большим количеством «потребителей», и так далее.

Один из тех сервисов, которые следят за темой обновлений свойств, отвечает за выявление сделок. Он рассматривает все обновленные продукты и, основываясь на модели, основанной на различных эвристиках, с учетом истории цен продукта и прогноза ценовой траектории продукта, он может перевести продукт в «сделку» или отказаться от существующих сделок (т. Е. продукт больше не считается сделкой). Эти различные вычисленные переходы (идентификатор продукта, сделка / нет сделки) производятся в другой теме TANK, которая используется для отслеживания переходов сделок.

Другая служба, которая отслеживает эту тему, обрабатывает все эти обновления. Для продуктов, которые больше не помечаются как сделки, он определяет пользователей, которые были уведомлены о них, и удаляет соответствующие уведомления из их временной шкалы уведомлений (BestPrice имеет все виды социальных функций, а также отображает все сделки, о которых вы были уведомлены, на странице и в области уведомлений. ).
Для всех продуктов, которые теперь помечены как сделки, он определяет всех пользователей, которые могут быть в них заинтересованы, на основе их настроек, и генерирует для них события временной шкалы, а также создает больше событий TANK, по одному для каждого поддерживаемого метода уведомления. (электронная почта, слабина, толчок). Все эти новые события атомарно связаны с еще одной темой TANK. Эти события эффективно фиксируют контекст уведомления (информацию о пользователе (получателе), информацию о продуктах, способ уведомления и т. Д.).
Интересно, что эта служба кэширует информацию о пользовательских настройках и вариантах доставки в памяти и обновляет их всякий раз, когда эти параметры обновляются, реагируя на события, создаваемые в темах TANK, всякий раз, когда пользователи обновляют свои параметры.

Есть еще одна служба, которая использует эту тему и отвечает за обработку запланированных уведомлений. Для событий Slack он напрямую взаимодействует через параллельные HTTP-запросы с API Slack. Для push-уведомлений он взаимодействует с кластерами узловых серверов, которые мы запускаем локально, которые, в свою очередь, взаимодействуют с API-интерфейсами Google Push-уведомлений.
Однако для электронной почты это становится немного интереснее. Эта служба будет взаимодействовать (через RPC / одновременные HTTP-запросы) с кластером служб узлов, который отвечает за создание сообщений электронной почты. Наши инженеры создали эту службу, которая принимает полезные данные JSON, описывающие тип и другие конкретные свойства, и генерирует сообщение электронной почты, которое будет доставлено пользователю. Может быть, когда-нибудь кто-нибудь из нашей команды фронтенд-инженеров опишет, как это работает на практике. Это довольно элегантно.
После того, как «контролер уведомлений» (мы называем все эти службы надзирателями), собирает сгенерированные сообщения электронной почты, он генерирует для них новые события, включая тему, текст, получателя. адрес электронной почты и многое другое, и упаковывает их все в новые события, которые создаются для еще одной темы TANK.

Другой надзиратель отслеживает эту тему TANK, которую мы используем для планирования исходящих сообщений электронной почты. Как только он принимает одно такое сообщение, он распаковывает его и, используя нашу библиотеку почтового клиента, взаимодействует с другой службой, описанной в следующем абзаце, чтобы поставить его в очередь для доставки. Каждому такому электронному письму, поставленному в очередь на доставку, библиотека почтового клиента присваивает отдельный идентификатор. Эта клиентская библиотека связывает этот идентификатор исходящего почтового сообщения с событием электронной почты BestPrice (чтобы мы могли знать, что электронное письмо, запланированное на BestPrice определенного типа, с определенной отметкой времени, конкретному получателю и т. Д., Связано с идентификатором почты).

Служба планировщика исходящей почты отвечает за управление всей исходящей электронной почтой нашей компании. Он говорит по SMTP и взаимодействует с нашими Postfix MTA для планирования электронной почты. Он понимает различные ответы и поддерживает повторные попытки, постепенную отсрочку и другие полезные функции.
Он, в свою очередь, будет генерировать события, которые включают идентификатор почтового сообщения и статус (сбой, успешно поставлен в очередь, возвращено и т. д.). Когда почтовое сообщение ставится в очередь для доставки с помощью Postfix, конкретный идентификатор сообщения Postfix связывается с нашим идентификатором почтового сообщения.

Другая служба отслеживает журнал Postfix на предмет проблем, связанных с доставкой почты (отказы и т. Д.). Он пытается сопоставить идентификатор сообщения Postfix с идентификатором почтового сообщения, и если это так, он генерирует новое событие для (идентификатор почтового сообщения, контекст сбоя) в теме TANK, которую мы используем для отслеживания проблем с доставкой почты.

Еще одна служба (еще устала? :) отслеживает эту тему и обрабатывает эти события. Например, если мы получаем сообщение о недоставке и можем связать идентификатор почтового сообщения с электронным уведомлением по лучшей цене, тогда он помечает адрес электронной почты как недоступный, чтобы мы перестали отправлять на него электронные письма в будущем, а также предупредили пользователя. когда она снова использует BestPrice, чтобы, возможно, попробовать другой адрес электронной почты.

На самом деле в сервисе сделок задействовано больше сервисов, таких как тот, который фиксирует показатели взаимодействия и вовлеченности (т. Е. Процент открытия почты и клики в сообщении электронной почты), которые будут сохранены в еще одной теме TANK для обработки еще одним надзирателем, который строит сообщает и, как вы уже догадались, выдает больше событий.

Есть функции, в которых задействовано гораздо больше контролеров. Один, в частности, отслеживает более 15 тем и управляет инвентаризацией всех 7 или около того миллионов продуктов и генерирует события, которые обрабатываются двумя дюжинами других контролеров. Это может быть ошеломляющим, но на самом деле это очень просто, потому что все они отделены и изолированы друг от друга.

Есть много преимуществ, связанных с обработкой событий. Мы можем устранять проблемы, воспроизводя события с определенного момента времени, мы можем фиксировать события, даже если они нам не нужны, и использовать их позже, когда мы думаем о чем-то полезном, что с ними можно сделать. Подробнее о таких преимуществах читайте здесь.

Если вас это интересует, мы нанимаем.