Предыстория

В процессе разработки крупномасштабного проекта со слишком большим количеством микросервисов я работал над собственной библиотекой для обмена сообщениями очереди проекта.
Библиотека, предназначенная для инкапсуляции интеграции с выбранной технологией очереди, конкретные API и их использование, а также рейд на возможности DI в NestJS.

Дизайн и рекомендации

В проекте использовалась технология Azure Service Bus. Общая политика в отношении микрослужб, использующих очередь, заключается в том, что каждая служба получает свою собственную тему, только служба может писать в свою тему, любая другая соответствующая служба может читать из нее (с отказом от ответственности в некоторых случаях, например, общая тема для уведомлений клиентов). , любой сервис может писать).

Таким образом, служба может писать в одну тему и читать из нескольких тем. Чтобы службы, которым требуется доступ для чтения к теме, не гонялись за чтением одного и того же сообщения, получили специальную подписку, настроенную для этой темы.
Подписка на тему содержит условие, применяемое к сообщению, поэтому служба, подписавшаяся на нее, получит только соответствующее сообщение.

Например, обе службы X и Z должны читать из темы A, тема A будет иметь две подписки sub_X и sub_Z, каждая подписка фильтрует сообщения, относящиеся к каждой службе, и в процессе клонирует сообщение для подписки.

Каждая тема имеет две строки подключения, одну для чтения и одну для записи.

Код и использование

Давайте представим использование конечным пользователем (разработчиком) библиотеки, поддерживающей такие возможности.

// pseudo code for Library consumption 
Service X
  MessageBrokerModule: {
    strategy: AzureServiceBusProviderClass, // optional - this is default 
    connections: {
      connection to topic B - Write : 
        configService.get('topic-b-pub-connection-string')
      connection to topic A - Read : 
        configService.get('topic-a-sub-connection-string')
      connection to topic C - Read : 
        configService.get('topic-c-sub-connection-string')
    },
    listiners: [
      { class: TopicAListenerClass, topic_name: TopicA, sub_name: SubX },
      { class: TopicCListenerClass, topic_name: TopicC, sub_name: SubX }
    ]
  }

Прежде чем увидеть реальный код, вот несколько соответствующих ссылок:

Стратегия

Библиотека разработана с несвязанной абстракцией конкретной службы обмена сообщениями для достижения независимости от поставщика. strategy — внутренний адаптер, используемый для взаимодействия со службой обмена сообщениями, может быть предоставлен в модуле, если он реализует interface MessageBrokerProvider;

export interface MessageListenerProvider {
    listenToTopic(listener: TopicListener, options?: ListenToTopicOptions): SubscriptionClose;
    listenToQueue(queueName: string): Promise<boolean>;
    isTopicListenerConnected(topicListener: TopicListener): boolean;
}

export interface MessagePublisherProvider {
    postMessageToTopic(topicName: string, payload: string, options?: PostMessageOptions): Promise<void>;
    postMessageToQueue(queueName: string, payload: string): Promise<boolean>;
}

export interface MessageBrokerProvider extends 
  MessageListenerProvider, 
  MessagePublisherProvider { }

Соединения

Для каждого соединения будет экспортирован поставщик MessageBrokerService, чтобы пользователь мог внедрить его в любом месте с помощью @Inject(<token>).
Внутри модуля для каждого соединения соберите FactoryProvider, для этого пользователь предоставляет строку provide — при этом MessageBrokerService (для соединения) будет @Inject , функцией useFactory и необязательным массивом inject поставщики, внедренные в FactoryProvider ;

MessageBrokerConnectionModule.forRoot({
  strategy: FakeQueueService,
  providers: [
    {
      useFactory: (config: ConfigService): MessageBrokerModuleConfigCore => ({
        connectionString: config.get('SERVICEBUS_SUB_CONNECTION_FAKE_01'),
      }),
      inject: [ConfigService],
      provide: MESSAGE_BROKER_FAKE_01_TOPIC_READ,
    },
    {
      useFactory: (config: ConfigService): MessageBrokerModuleConfigCore => ({
        connectionString: config.get('SERVICEBUS_PUB_CONNECTION_FAKE_02'),
      }),
      inject: [ConfigService],
      provide: MESSAGE_BROKER_FAKE_02_TOPIC_WRITE,
    }
  ]
})

Слушатели

Listener — это класс injectable, реализующий интерфейс TopicListener;

export interface TopicListener {
    handleMessage(message: HandleMessageArgs): Promise<void>;
    handleError(args: HandleErrorArgs): Promise<void>;
    shouldListen(): boolean;
    isConnected?: () => boolean;
    topicName: string;
    subscriptionName: string;
}

Слушатели регистрируются в паре с connectionToken (токен provide) MessageBrokerService , поэтому @Module MessageBrokerListenersModule подписывают слушателя на правильную тему.
Если класс Listener украшен @TopicListenerInjectable({ connectionToken: <token>}) , регистрацию можно выполнить следующим образом:

MessageBrokerListenersModule.forRoot({
    providers: [ConfigService], // providers are optional
    listeners: [
        MessageBrokerFakeTopicListener,
        // more listeners here ... 
    ]
})

Если класс Listener украшен стандартом@Injectable, слушатель должен быть зарегистрирован с прикрепленным connectionToken, например:

MessageBrokerListenersModule.forRoot({
  providers: [ConfigService], // providers are optional
  listeners: [
    {
      listenerClass: MessageBrokerFakeTopicListener,
      connectionToken: MESSAGE_BROKER_FAKE_01_TOPIC_READ
    }
  ]
})

Написать сообщение

И, наконец, чтобы написать сообщение в конкретную тему, вставьте нужный MessageBrokerService — экспортированный из MessageBrokerConnectionModule, с @Inject(<connectionToken>) в соответствующем сервисе, и вызовите метод postMessageToTopic;

@Injectable()
export class AppService {
  constructor(
    @Inject(MESSAGE_BROKER_FAKE_02_TOPIC_WRITE) 
    private writeMessageService: MessageBrokerService
  ) {}
  doSomething() {
    this.writeMessageService.postMessageToTopic(...)
    // ...
  }
} 

Вот исходный код библиотеки

Библиотека npm (в бета-версии)