NServiceBus: как заставить подписчика подписаться на несколько типов сообщений (каждый из другой очереди)

Я хотел бы, чтобы мой подписчик обрабатывал два разных потока сообщений. Я ожидаю, что будет очередь MSMQ для каждого типа сообщений, но я не вижу, как указать более одной InputQueue в разделе MsmqTransportConfig в моем файле .config.

Вот конфигурация без XML для моего подписчика:

        Configure.With(new[] { typeof(EventMessage), typeof(EventMessageHandler), typeof(NServiceBus.Unicast.Transport.CompletionMessage) })
            .CustomConfigurationSource(new UserConfigurationSource()
               .Register(() => new MsmqTransportConfig { InputQueue = "Subscriber1InputQueue", ErrorQueue = "error", NumberOfWorkerThreads = 1, MaxRetries = 5 }))
            .DefaultBuilder()
            .XmlSerializer()
            .MsmqTransport()
              .IsTransactional(true)
          .UnicastBus()
              .DoNotAutoSubscribe()
              .LoadMessageHandlers()
          .CreateBus()
          .Start();

РЕДАКТИРОВАТЬ: кажется, я получаю разные ответы от разных людей. Всем спасибо! Я думаю, что у меня есть ответ на мой вопрос, а именно: процесс, использующий NServiceBus (будь то издатель или подписчик), может получать сообщения только в ОДНОЙ очереди. Для меня это ненужное ограничение, и очень жаль, что NServiceBus работает таким образом. Я не хочу иметь несколько процессов для получения сообщений, и я не хочу, чтобы все они попадали в одну и ту же очередь. Если есть проблема с определенным обработчиком сообщений, я бы хотел, чтобы очередь ошибок для этого конкретного типа сообщений увеличивалась в размере. Я думаю, это позволило бы лучше видеть, что происходит в системе.


person skb    schedule 03.12.2010    source источник
comment
+1 - это отличный вопрос, с которым я тоже борюсь.   -  person TrueWill    schedule 13.09.2011


Ответы (3)


еще не использовали конфигурацию без xml, но с файлом конфигурации это будет выглядеть так:

<MsmqTransportConfig InputQueue="WorkerQueueForCurrentService" ErrorQueue="ErrorQueue" NumberOfWorkerThreads="1" MaxRetries="5"/>

<UnicastBusConfig>
    <MessageEndpointMappings>
        <add Messages="AssemblyName1" Endpoint="PublisherQueue1" />
        <add Messages="AssemblyName2.Message1, AssemblyName2" Endpoint="PublisherQueue2" />
        <add Messages="AssemblyName2.Message3, AssemblyName2" Endpoint="PublisherQueue2" />
    </MessageEndpointMappings>
</UnicastBusConfig>

поэтому ваша рабочая очередь для текущей службы — «WorkerQueueForCurrentService», и она подписывается на разные сообщения, которые публикуются в очередях «PublisherQueue1» и «PublisherQueue2». я включил образец для подписки на всю сборку сообщений (см. строку добавления сообщений 1) и для конкретных сообщений в данной сборке сообщений (см. строки добавления сообщений 2 и 3).

Ответ Кристиана Кристенсена неверен. входная очередь актуальна для каждой службы, использующей nservicebus. независимо от того, издатель это или подписчик. издатель получает уведомления о подписке во входной очереди, а подписчик устанавливает входную очередь в качестве очереди назначения для уведомления о подписке, отправляемого издателю.

если вы хотите программно подписаться на сообщения, как говорит mrnye, вам понадобится отображение конечной точки сообщения. поэтому, если вы делаете bus.subscribe, nservicebus просматривает его сопоставления конечных точек сообщений и пытается извлечь имя очереди издателя, в которой публикуется это сообщение.

Отображение конечной точки сообщения используется для обоих:
- поиска того, какие сообщения публикуются, где
и
- очереди назначения, в которую отправляются сообщения, которые вы bus.send()

надеюсь, это прояснит некоторые вещи :-)

person Stephan Schinkel    schedule 06.12.2010
comment
Спасибо, хактик. Это проясняет ситуацию. Есть одна вещь, которую вы сказали, с которой, я думаю, я не согласен: сопоставления конечных точек сообщений используются для... поиска того, какие сообщения и где публикуются. Я думаю, что на самом деле это конечные точки/типы сообщений, на которые процесс хочет подписаться (будь то автоматический или ручной). Я так думаю, потому что я даже не определил это для своего издателя ИЛИ моей службы подписки, потому что я не использую механизм автоматической подписки и использую свою собственную реализацию ISubscriptionStorage. - person skb; 06.12.2010
comment
привет skb, поиск того, какие сообщения публикуются, где относится к тому, хочу ли я подписаться на сообщение, которое я делаю bus.subscribe(messagetype) и nservicebus смотрит, где это сообщение опубликовано. чтобы nservicebus знал, куда отправлять уведомление о подписке. извините за нечеткое определение. - person Stephan Schinkel; 07.12.2010

В NServiceBus все сообщения проходят через одну очередь. Это сопоставление 1:1 между очередями:процессами. Таким образом, с помощью DoNotAutoSubscribe() вы просто вручную подписываетесь на нужные сообщения с сопоставлениями в вашем app.config.

например, для подписки используйте функцию после вашей конфигурации

_Bus.Subscribe<SomeMessage>();

Я не могу вспомнить синтаксис отображения сообщений, извините.

person mike    schedule 04.12.2010

Ознакомьтесь с конфигурацией публикации/подписки. InputQueue указывается для элемента Publisher, а не для подписчика. Последний добавляет интересующие его сообщения в MessageEndpointMappings в UnicastBusConfig. Если вас интересуют два разных потока, просто добавьте элементы в MessageEndpointMappings.

person Kristian Kristensen    schedule 03.12.2010
comment
Я действительно смущен. :) Я использовал MsmqTransportConfig, чтобы указать, как мой ПОДПИСЧИК получает сообщения... и это РАБОТАЕТ! Я обновил вопрос, чтобы отразить мою конфигурацию. Можете ли вы сказать мне, почему это работает? - person skb; 04.12.2010
comment
Теперь у меня другой вопрос. Почему в элементе MessageEndPointMappings нет значения ErrorQueue? Похоже, если бы у подписчика была ошибка при обработке сообщения, он бы не знал, куда его поместить. Я надеялся, что MessageQueueA, MessageQueueB, MessageQueueC и ErrorQueueA, ErrorQueueB, ErrorQueueC будут соответствовать типам сообщений TypeA, TypeB, TypeC, чтобы, если сообщение не удалось обработать 5 раз (из-за ошибки на подписчике), Я мог бы иметь другой процесс для перемещения сообщения из ErrorQueue* в MessageQueue* позже. - person skb; 04.12.2010
comment
MessageEndpointMappings указывает, где находится ваш издатель, поэтому ваш подписчик знает, куда отправлять сообщения о подписке. Когда подписка настроена, издатель отправит опубликованное сообщение в конечную точку, описанную в InputQueue (в вашем примере Subscriber1InputQueue). Если обработка этого сообщения не удалась, это сообщение будет перемещено в очередь ошибок, указанную в вашем коде выше. Имеет ли это смысл? - person Kristian Kristensen; 04.12.2010
comment
Skb — для достижения отдельных очередей вам понадобится отдельный процесс для каждого типа сообщений. - person Adam Fyles; 04.12.2010