
Предыстория
В процессе разработки крупномасштабного проекта со слишком большим количеством микросервисов я работал над собственной библиотекой для обмена сообщениями очереди проекта.
Библиотека, предназначенная для инкапсуляции интеграции с выбранной технологией очереди, конкретные API и их использование, а также рейд на возможности DI в NestJS.
Дизайн и рекомендации
В проекте использовалась технология Azure Service Bus. Общая политика в отношении микрослужб, использующих очередь, заключается в том, что каждая служба получает свою собственную тему, только служба может писать в свою тему, любая другая соответствующая служба может читать из нее (с отказом от ответственности в некоторых случаях, например, общая тема для уведомлений клиентов). , любой сервис может писать).
Таким образом, служба может писать в одну тему и читать из нескольких тем. Чтобы службы, которым требуется доступ для чтения к теме, не гонялись за чтением одного и того же сообщения, получили специальную подписку, настроенную для этой темы.
Подписка на тему содержит условие, применяемое к сообщению, поэтому служба, подписавшаяся на нее, получит только соответствующее сообщение.
Например, обе службы X и Z должны читать из темы A, тема A будет иметь две подписки sub_X и sub_Z, каждая подписка фильтрует сообщения, относящиеся к каждой службе, и в процессе клонирует сообщение для подписки.

Каждая тема имеет две строки подключения, одну для чтения и одну для записи.
Код и использование
Давайте представим использование конечным пользователем (разработчиком) библиотеки, поддерживающей такие возможности.
// pseudo code for Library consumption
Service X
MessageBrokerModule: {
strategy: AzureServiceBusProviderClass, // optional - this is default
connections: {
connection to topic B - Write :
configService.get('topic-b-pub-connection-string')
connection to topic A - Read :
configService.get('topic-a-sub-connection-string')
connection to topic C - Read :
configService.get('topic-c-sub-connection-string')
},
listiners: [
{ class: TopicAListenerClass, topic_name: TopicA, sub_name: SubX },
{ class: TopicCListenerClass, topic_name: TopicC, sub_name: SubX }
]
}
Прежде чем увидеть реальный код, вот несколько соответствующих ссылок:
- Динамические модули NestJS
- Конфигурация NestJS
- События жизненного цикла приложения NestJS
- Служебная шина Azure (опционально)
Стратегия
Библиотека разработана с несвязанной абстракцией конкретной службы обмена сообщениями для достижения независимости от поставщика. strategy — внутренний адаптер, используемый для взаимодействия со службой обмена сообщениями, может быть предоставлен в модуле, если он реализует interface MessageBrokerProvider;
export interface MessageListenerProvider {
listenToTopic(listener: TopicListener, options?: ListenToTopicOptions): SubscriptionClose;
listenToQueue(queueName: string): Promise<boolean>;
isTopicListenerConnected(topicListener: TopicListener): boolean;
}
export interface MessagePublisherProvider {
postMessageToTopic(topicName: string, payload: string, options?: PostMessageOptions): Promise<void>;
postMessageToQueue(queueName: string, payload: string): Promise<boolean>;
}
export interface MessageBrokerProvider extends
MessageListenerProvider,
MessagePublisherProvider { }
Соединения
Для каждого соединения будет экспортирован поставщик MessageBrokerService, чтобы пользователь мог внедрить его в любом месте с помощью @Inject(<token>).
Внутри модуля для каждого соединения соберите FactoryProvider, для этого пользователь предоставляет строку provide — при этом MessageBrokerService (для соединения) будет @Inject , функцией useFactory и необязательным массивом inject поставщики, внедренные в FactoryProvider ;
MessageBrokerConnectionModule.forRoot({
strategy: FakeQueueService,
providers: [
{
useFactory: (config: ConfigService): MessageBrokerModuleConfigCore => ({
connectionString: config.get('SERVICEBUS_SUB_CONNECTION_FAKE_01'),
}),
inject: [ConfigService],
provide: MESSAGE_BROKER_FAKE_01_TOPIC_READ,
},
{
useFactory: (config: ConfigService): MessageBrokerModuleConfigCore => ({
connectionString: config.get('SERVICEBUS_PUB_CONNECTION_FAKE_02'),
}),
inject: [ConfigService],
provide: MESSAGE_BROKER_FAKE_02_TOPIC_WRITE,
}
]
})
Слушатели
Listener — это класс injectable, реализующий интерфейс TopicListener;
export interface TopicListener {
handleMessage(message: HandleMessageArgs): Promise<void>;
handleError(args: HandleErrorArgs): Promise<void>;
shouldListen(): boolean;
isConnected?: () => boolean;
topicName: string;
subscriptionName: string;
}
Слушатели регистрируются в паре с connectionToken (токен provide) MessageBrokerService , поэтому @Module MessageBrokerListenersModule подписывают слушателя на правильную тему.
Если класс Listener украшен @TopicListenerInjectable({ connectionToken: <token>}) , регистрацию можно выполнить следующим образом:
MessageBrokerListenersModule.forRoot({
providers: [ConfigService], // providers are optional
listeners: [
MessageBrokerFakeTopicListener,
// more listeners here ...
]
})
Если класс Listener украшен стандартом@Injectable, слушатель должен быть зарегистрирован с прикрепленным connectionToken, например:
MessageBrokerListenersModule.forRoot({
providers: [ConfigService], // providers are optional
listeners: [
{
listenerClass: MessageBrokerFakeTopicListener,
connectionToken: MESSAGE_BROKER_FAKE_01_TOPIC_READ
}
]
})
Написать сообщение
И, наконец, чтобы написать сообщение в конкретную тему, вставьте нужный MessageBrokerService — экспортированный из MessageBrokerConnectionModule, с @Inject(<connectionToken>) в соответствующем сервисе, и вызовите метод postMessageToTopic;
@Injectable()
export class AppService {
constructor(
@Inject(MESSAGE_BROKER_FAKE_02_TOPIC_WRITE)
private writeMessageService: MessageBrokerService
) {}
doSomething() {
this.writeMessageService.postMessageToTopic(...)
// ...
}
}