Мое веб-приложение .net core3.1 имеет, скажем, 4 микросервиса (MasterMS, PartyMS, ProductMS, PurchaseMS) и использует Rabbitmq в качестве брокера сообщений.
В одном конкретном сценарии MasterMS публикует событие (вставка / обновление в таблице Company) на обмен Rabbitmq (xAlexa), откуда оно распределяется по соответствующим очередям всех подписавшихся MS (PartyMS, ProductMS).
PartyMS получает событие из очереди CompanyEventPartyMS, а ProductMS получает его из очереди CompanyEventProductMS. Таким образом, и Сторона, и Продукт обновляют свою соответствующую таблицу Компании, и все синхронизируется и идеально. Кстати, PurchaseMS не подписывается и поэтому не заморачивался.
Теперь возникает настоящая проблема. Подписавшиеся MS (потребители) не отвечают, когда их веб-страница запрашивается. Веб-страницы PartyMS и ProductMS выдают SocketException, в то время как не подписчик PurchaseMS работает нормально. Теперь, если я закомментирую строку, в которой подписывается PartyMS, она снова начнет работать, хотя больше не получает CompanyEvent и выходит из синхронизации. Любые идеи друзей?
SocketException: невозможно установить соединение, потому что целевая машина активно отказалась от него. System.Net.Http.ConnectHelper.ConnectAsync (строковый хост, int порт, CancellationToken cancellationToken)
public void Publish<T>(T @event) where T : Event
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "xAlexa", type: ExchangeType.Fanout);
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
var eventName = @event.GetType().Name;
channel.BasicPublish(exchange: "xAlexa",
routingKey: eventName, //string.Empty,
basicProperties: null,
body: body);
}
}
StartBasicConsume
private void StartBasicConsume<T>() where T : Event
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
DispatchConsumersAsync = true
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var eventName = typeof(T).Name;
var msName = typeof(T).FullName;
string[] str = { };
str = msName.Split('.');
eventName += str[1];
channel.ExchangeDeclare(exchange: "xAlexa",
type: ExchangeType.Fanout);
channel.QueueDeclare(eventName, true, false, false, null); //channel.QueueDeclare().QueueName;
channel.QueueBind(queue: eventName,
exchange: "xAlexa",
routingKey: string.Empty);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(eventName, true, consumer);
Console.WriteLine("Consumer Started");
Console.ReadLine();
}
private async Task Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var eventName = e.RoutingKey;
var body = e.Body.ToArray();
//var body = e.Body.Span;
var message = Encoding.UTF8.GetString(body);
//var message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine(message);
try
{
await ProcessEvent(eventName, message).ConfigureAwait(false);
}
catch (Exception ex)
{
}
}
Вызов ProductsMS Api из приложения MVC (здесь он не работает, если подписан, и работает, если не подписан на CompanyEvent!)
public class ProductService:IProductService
{
private readonly HttpClient _apiCLient;
public ProductService(HttpClient apiCLient)
{
_apiCLient = apiCLient;
}
public async Task<List<Product>> GetProducts()
{
var uri = "https://localhost:5005/api/ProductApi";
List<Product> userList = new List<Product>();
HttpResponseMessage response = await _apiCLient.GetAsync(uri);
if (response.IsSuccessStatusCode)
{
var readTask = response.Content.ReadAsStringAsync().Result;
userList = JsonConvert.DeserializeObject<List<Product>>(readTask);
}
return userList;
}
}
Найдите ProductsMS Api Startup.cs ниже:
namespace Alexa.ProductMS.Api
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
var connectionString = Configuration["DbContextSettings:ConnectionString"];
var dbPassword = Configuration["DbContextSettings:DbPassword"];
var builder = new NpgsqlConnectionStringBuilder(connectionString)
{
Password = dbPassword
};
services.AddDbContext<ProductsDBContext>(opts => opts.UseNpgsql(builder.ConnectionString));
services.AddMediatR(typeof(Startup));
RegisterServices(services);
}
private void RegisterServices(IServiceCollection services)
{
DependencyContainer.RegisterServices(services);
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
});
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
ConfigureEventBus(app); //WORKS IF COMMENTED; FAILS OTHERWISE <---
}
private void ConfigureEventBus(IApplicationBuilder app)
{
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<CompanyEvent, CompanyEventHandler>();
eventBus.Subscribe<PartyEvent, PartyEventHandler>();
}
}
}
Также смотрите изображения:
SocketException
, потреблять больше сообщений? Что ваш код делает наConsumer_Received
? - person Bohdan Stupak   schedule 19.11.2020