Ребус отправляется из Bus2 в обработчике сообщений Bus1

У меня есть автобусы Rebus, которые используют Azure ServiceBus, но когда я пытаюсь отправить из Bus2 в один из обработчиков сообщений Bus1, это не работает. Сообщение не отправляется.

Есть идеи ?

РЕДАКТИРОВАТЬ

Автобус 1

        string padesQueueAddress = "padesworker";
        int numberOfWorkes = Settings.NumberOfWorkers>0   Settings.NumberOfWorkers:10;

        string errorQueueAddress = string.Format("{0}-error", queueAddress);

        var adapter = new AutofacContainerAdapter(Container);
        Bus = Configure.With(adapter)
             .Logging(l =>  l.Use(UseRaygunRebusLoggingFactory(rayclient,Settings.Debug ? RaygunLoggerLevel.DEBUG :  RaygunLoggerLevel.WARN)))
             .Transport(t => t.UseAzureServiceBus( Settings.AzureQueueConnectionString, queueAddress,AzureServiceBusMode.Standard))                                  
             .Sagas(s => s.StoreInSqlServer(string.IsNullOrWhiteSpace(Settings.RebusSagaSqlConnectionString) ? Settings.AzureSqlConnectionString : Settings.RebusSagaSqlConnectionString, "Saga", "SagaIndex") )
             .Routing(r => r.TypeBased().MapAssemblyOf<SendSmsCommand>(queueAddress).MapAssemblyOf<Unipluss.Sign.Pades.Commands.CreatePadesCommand>(padesQueueAddress))                 
            .Options(o =>
            {
                o.SimpleRetryStrategy(secondLevelRetriesEnabled: true, maxDeliveryAttempts:5,errorQueueAddress: errorQueueAddress);

                o.SetNumberOfWorkers(numberOfWorkes);
                o.SetMaxParallelism(numberOfWorkes);                    

            })                
            .Start();

        await Bus.SendLocal(new HeartBeatCommand());

Автобус 2

    private  IBus CreateExternalEventBus()
    {

        var eventBus = Configure.With(new BuiltinHandlerActivator())
                .Transport(t => t.UseAzureServiceBus(Settings.EventServiceBusConnectionString, queueAddress+"_event", AzureServiceBusMode.Basic))
                .Logging(x=>x.ColoredConsole(LogLevel.Debug))
                .Options(o =>
                {

                o.LogPipeline(true);
                o.EnableCompression();
                o.EnableEncryption(Settings.RebusEncryptionExternalEvents);
                })
                .Start();

        eventBus.Advanced.Routing.Send("1dd0f6f9422146048516a30f00aef4e5",new Unipluss.Sign.Events.Entities.DocumentCancledEvent() {CancledMessage = "test",DocumentId = Guid.NewGuid()});

        return eventBus;

    }

Отправка eventBus с жестко запрограммированной отправкой работает, но когда я отправляю ее из одного из обработчиков сообщений Bus1, сообщение не отправляется (при регистрации в журнале сообщается, что сообщение отправлено, но оно появляется в очереди).

Шина 2 обернута в класс-оболочку, а затем введена в Autofac, чтобы избежать интерфейсов IBus в Autofac.

        builder.Register(c => new ExternalEventsBus(CreateExternalEventBus()))
 .As<IExternalEventsBus>().SingleInstance();

public class ExternalEventsBus:IExternalEventsBus
{
    private IBus Bus;

    public ExternalEventsBus(IBus bus)
    {
        Bus = bus;
    }

    public async Task Send(object message, Guid documentProviderId)
    {
        await Bus.Advanced.Routing.Send(documentProviderId.ToString("n"), message);

    }

    public Task Send(object message, DocumentProvider documentProvider)
    {
        if (!string.IsNullOrWhiteSpace(documentProvider.RebusQueueConnectionString))
            return Send(message, documentProvider.Id);

        return Task.FromResult(true);
    }

    public void Dispose()
    {
        if(Bus!=null)
            Bus.Dispose();
    }
}

Затем IExternalEventsBus используется в нескольких обработчиках сообщений в Bus1.


person Rune Synnevåg    schedule 18.03.2016    source источник
comment
Не могли бы вы рассказать немного больше об именах входных очередей двух шин, настройке маршрутизации и т. д.?   -  person mookid8000    schedule 18.03.2016
comment
Я хотел бы помочь вам :) пожалуйста, дайте мне знать больше о том, что вы пытаетесь сделать, и как вы пытаетесь это сделать....   -  person mookid8000    schedule 20.03.2016
comment
У меня есть внутренняя очередь (с использованием Azure ServiceBus и Rebus), именуемая Bus1. Затем у нас есть вторая внешняя очередь (также использующая Azure ServiceBus и Rebus), называемая Bus2. Когда происходят события, Bus1 сохраняет состояния и данные в БД, а затем отправляет сообщение Bus2. Каждая учетная запись может включать события в своей учетной записи и получает собственную очередь. Проблема возникает при отправке сообщения на Bus2 внутри из MessageHandler, принадлежащего Bus1. Когда я делаю Bus2.Send из своей службы, это работает нормально, но когда я делаю Bus2.Send внутри функции Handle (Message message), сообщение не отправляется.   -  person Rune Synnevåg    schedule 20.03.2016
comment
Для шины 1 (внутренняя) используется обычная маршрутизация, при которой все сообщения живут в одной сборке. Для Bus2 (внешней) мы не используем маршрутизацию, потому что мы отправляем сообщение в именованную очередь (идентификатор учетной записи клиента — это guid), используя Bus.Advanced.Routing.Send(guid,message).   -  person Rune Synnevåg    schedule 20.03.2016
comment
было бы очень полезно, если бы вы могли отредактировать исходный вопрос и вставить соответствующие фрагменты кода, то есть биты конфигурации и биты, в которые отправляются сообщения.   -  person mookid8000    schedule 21.03.2016
comment
Я мог бы вставить конфигурацию, но сообщение отправляется через службы, которые вводят шину через Autofac, так что это не так просто.   -  person Rune Synnevåg    schedule 21.03.2016
comment
Я просто ищу биты Configure.With(...) и биты await _bus.Send(..)...   -  person mookid8000    schedule 21.03.2016


Ответы (1)


Хорошо... позвольте мне посмотреть, понимаю ли я это (пожалуйста, поправьте меня, если я ошибаюсь):

У вас есть два экземпляра шины в вашем процессе:

  1. Тот, который использует входную очередь со значением, указанным queueAddress, которое вы здесь не указали.
  2. Другой («внешняя шина событий»), у которого есть входная очередь со значением, указанным queueAddress+"_event", которое вы также не включили сюда.

Похоже, цель ваших экземпляров шины заключается в том, что первый используется для координации вещей внутри приложения, тогда как второй используется для маршрутизации событий слушателям снаружи — это то, что вы бы назвали "контент-ориентированный маршрутизатор", так как он будет маршрутизировать сообщения в зависимости от некоторого значения содержания сообщения (в данном случае documentProviderId).

Теперь вы столкнулись с ошибкой: когда шина (1) использует шину (2) из ​​одного из своих собственных обработчиков сообщений, кажется, что маршрутизируемое сообщение не отправляется.

Из кода, который вы отправили из ExternalEventsBus, неясно, какой метод Send вы вызываете, но я могу сказать вам, что метод с сигнатурой public Task Send(object message, DocumentProvider documentProvider) будет отправлять сообщение только в том случае, если documentProvider.RebusQueueConnectionString не равен нулю.

  1. Вы убедились, что строка подключения действительно содержит значение?
  2. Почему в ExternalEventsBus есть поле _connectionString? Вы должны были использовать это при создании автобуса?
  3. Помните ли вы await bus.Send(...) (то есть await результат асинхронной операции) каждый раз, когда вы вызываете автобус?
person mookid8000    schedule 21.03.2016
comment
1. Да, строка подключения имеет значение. 2. _connectionString был предназначен только для проверки того, что при создании ExternalEventBus использовалась правильная строка подключения (AzureServiceBus). Шина 2 (внешняя) использует маршрутизацию контента, где у каждой учетной записи есть отдельная очередь. Когда мы используем шину 2 вне шины 1 (например, при запуске), это работает нормально, но из функции дескриптора шины 1 сообщение исчезает. Любая теория? - person Rune Synnevåg; 21.03.2016
comment
только что добавил пулю 3... :) - person mookid8000; 21.03.2016
comment
Да, он использует await Send из асинхронных методов и или Task.Run(async()=›await Send(...)) из методов синхронизации. - person Rune Synnevåg; 21.03.2016
comment
не мог бы ты попробовать bus.Send(...).Wait() вместо этой забавной двойной асинхронной лямбды, которую ты там делаешь? :D - person mookid8000; 21.03.2016
comment
Конечно, я мог бы попробовать это, может быть, это асинхронная операция, которая не ожидается, поэтому отправка прерывается до ее завершения? - person Rune Synnevåg; 21.03.2016
comment
я предполагаю, что асинхронный bus.Send каким-то образом ускользает от вас, что приводит к тому, что любые выброшенные исключения просто исчезают.... - person mookid8000; 21.03.2016
comment
Я проверил код сейчас, все функции являются асинхронными задачами на всем пути от обработчика сообщений до команды отправки Bus2. Придется отлаживать код, чтобы проверить на 100% (посмотрел только исходники на github) У меня нет к нему доступа из домашнего офиса. Но мне кажется, что это может быть что-то другое. - person Rune Synnevåg; 21.03.2016