В настоящее время у меня есть одно приложение Service Fabric, состоящее из нескольких служб. Я пытаюсь достичь механизма очередей, чтобы одна служба могла публиковать сообщение в очереди, а другая служба могла получать сообщения из той же очереди.
Следующее не работает (для службы Listener исключать из очереди нечего):
PublisherService
:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var myQueue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("fooQueue");
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
using (var tx = this.StateManager.CreateTransaction())
{
// Put some message in the queue
await myQueue.EnqueueAsync(tx, "Foobar");
await tx.CommitAsync();
}
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
ListenerService
:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var myQueue = await StateManager.GetOrAddAsync<IReliableQueue<string>>("fooQueue");
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
using (var tx = this.StateManager.CreateTransaction())
{
var result = await myQueue.TryDequeueAsync(tx);
if (result.HasValue)
{
ServiceEventSource.Current.ServiceMessage(this.Context, "New message receieved: {0}", result.Value.ToString());
}
await tx.CommitAsync();
}
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
Похоже, объем очереди ограничен одной службой. Это не похоже на ограничение, указанное в документации.
Итак, мои вопросы:
- это на самом деле недокументированное ограничение?
- или что-то не так в приведенном выше коде?
- как я могу реализовать описанный выше сценарий (одна служба добавляет сообщения в очередь, другая - извлекает сообщения из той же очереди)?
Очевидно, я мог бы использовать служебную шину Azure, но не могу по нескольким причинам:
- в моем реальном сценарии у меня будет несколько очередей (переменное число), поэтому потребуется создание очередей служебной шины по запросу (что не совсем быстрая операция)
- добавляет зависимость к другой службе Azure (что увеличивает вероятность отказа для всей системы)
- стоит дороже
- более сложное развертывание
- и Т. Д.