Микросервисы не отвечают после подписки на обмен разветвления RabbitMQ

Мое веб-приложение .net core3.1 имеет, скажем, 4 микросервиса (MasterMS, PartyMS, ProductMS, PurchaseMS) и использует Rabbitmq в качестве брокера сообщений.

В одном конкретном сценарии MasterMS публикует событие (вставка / обновление в таблице Company) на обмен Rabbitmq (xAlexa), откуда оно распределяется по соответствующим очередям всех подписавшихся MS (PartyMS, ProductMS).

PartyMS получает событие из очереди CompanyEventPartyMS, а ProductMS получает его из очереди CompanyEventProductMS. Таким образом, и Сторона, и Продукт обновляют свою соответствующую таблицу Компании, и все синхронизируется и идеально. Кстати, PurchaseMS не подписывается и поэтому не заморачивался.

Теперь возникает настоящая проблема. Подписавшиеся MS (потребители) не отвечают, когда их веб-страница запрашивается. Веб-страницы PartyMS и ProductMS выдают SocketException, в то время как не подписчик PurchaseMS работает нормально. Теперь, если я закомментирую строку, в которой подписывается PartyMS, она снова начнет работать, хотя больше не получает CompanyEvent и выходит из синхронизации. Любые идеи друзей?

SocketException: невозможно установить соединение, потому что целевая машина активно отказалась от него. System.Net.Http.ConnectHelper.ConnectAsync (строковый хост, int порт, CancellationToken cancellationToken)

public void Publish<T>(T @event) where T : Event
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "xAlexa", type: ExchangeType.Fanout);

                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);
                var eventName = @event.GetType().Name;

                channel.BasicPublish(exchange: "xAlexa",
                                 routingKey: eventName, //string.Empty,
                                 basicProperties: null,
                                 body: body);
            }
        }

StartBasicConsume

private void StartBasicConsume<T>() where T : Event
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                DispatchConsumersAsync = true
            };

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            var eventName = typeof(T).Name;
            var msName = typeof(T).FullName;
            string[] str = { };
            str = msName.Split('.');
            eventName += str[1];
            
            channel.ExchangeDeclare(exchange: "xAlexa",
                                        type: ExchangeType.Fanout);
                        
            channel.QueueDeclare(eventName, true, false, false, null);  //channel.QueueDeclare().QueueName;

            channel.QueueBind(queue: eventName,
                              exchange: "xAlexa",
                              routingKey: string.Empty);

            var consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += Consumer_Received;

            channel.BasicConsume(eventName, true, consumer);
            Console.WriteLine("Consumer Started");
            Console.ReadLine();
        }

     private async Task Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var eventName = e.RoutingKey;
            var body = e.Body.ToArray();
            //var body = e.Body.Span;
            var message = Encoding.UTF8.GetString(body);
            //var message = Encoding.UTF8.GetString(e.Body);
            Console.WriteLine(message);

            try
            {
                await ProcessEvent(eventName, message).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
            }
        }

Вызов ProductsMS Api из приложения MVC (здесь он не работает, если подписан, и работает, если не подписан на CompanyEvent!)

public class ProductService:IProductService
    {
        private readonly HttpClient _apiCLient;

        public ProductService(HttpClient apiCLient)
        {
            _apiCLient = apiCLient;
        }

       public async Task<List<Product>> GetProducts()
        {
            var uri = "https://localhost:5005/api/ProductApi";
            List<Product> userList = new List<Product>();
            HttpResponseMessage response = await _apiCLient.GetAsync(uri);
            if (response.IsSuccessStatusCode)
            {
                var readTask = response.Content.ReadAsStringAsync().Result;
                userList = JsonConvert.DeserializeObject<List<Product>>(readTask);
            }
            return userList;
        }
}

Найдите ProductsMS Api Startup.cs ниже:

namespace Alexa.ProductMS.Api
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            var connectionString = Configuration["DbContextSettings:ConnectionString"];
            var dbPassword = Configuration["DbContextSettings:DbPassword"];
            var builder = new NpgsqlConnectionStringBuilder(connectionString)
            {
                Password = dbPassword
            };
            services.AddDbContext<ProductsDBContext>(opts => opts.UseNpgsql(builder.ConnectionString));

            services.AddMediatR(typeof(Startup));

            RegisterServices(services);
        }

        private void RegisterServices(IServiceCollection services)
        {
            DependencyContainer.RegisterServices(services);
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            });
            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            ConfigureEventBus(app); //WORKS IF COMMENTED; FAILS OTHERWISE <---
        }

        private void ConfigureEventBus(IApplicationBuilder app)
        {
            var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();            
            eventBus.Subscribe<CompanyEvent, CompanyEventHandler>();
            eventBus.Subscribe<PartyEvent, PartyEventHandler>();
        }

    }
}

Также смотрите изображения:

Обмен разветвления RabbitMQ

Очереди RabbitMQ

обмен

очереди


person Graison K    schedule 19.11.2020    source источник
comment
Здесь отсутствует множество деталей. Как вы обслуживаете эти веб-страницы? Могут ли те MS, которые терпят неудачу с SocketException, потреблять больше сообщений? Что ваш код делает на Consumer_Received?   -  person Bohdan Stupak    schedule 19.11.2020
comment
Отредактировал и добавил то, что вы просили.   -  person Graison K    schedule 20.11.2020


Ответы (1)


Удалите последнюю строку Console.ReadLine (); метода StartBasicConsume (). Когда мы используем эту строку в функции, она ожидает нажатия любой клавиши или любого ввода.

person Neeraj    schedule 28.01.2021