NServiceBus с очередями хранилища Azure будет бесконечно долго обрабатывать сообщения без заголовков.

Я экспериментирую с новым проектом NServiceBus, используя очереди службы хранилища Azure для передачи сообщений и сериализации JSON. Я заметил, что когда я запускаю сообщение через очередь, в которой отсутствуют заголовки NServiceBus, например, пустое сообщение JSON: { }, оно выдает следующее предупреждающее сообщение:

2020-02-06 17:46:35.587 WARN  NServiceBus.Transport.AzureStorageQueues.MessagePump Azure Storage Queue transport failed pushing a message through pipeline
System.ArgumentNullException: Value cannot be null.
Parameter name: nativeMessageId
   at NServiceBus.Transport.IncomingMessage..ctor(String nativeMessageId, Dictionary`2 headers, Byte[] body)
   at NServiceBus.Transport.ErrorContext..ctor(Exception exception, Dictionary`2 headers, String transportMessageId, Byte[] body, TransportTransaction transportTransaction, Int32 immediateProcessingFailures)
   at NServiceBus.Transport.AzureStorageQueues.ReceiveStrategy.CreateErrorContext(MessageRetrieved retrieved, MessageWrapper message, Exception ex, Byte[] body)
   at NServiceBus.Transport.AzureStorageQueues.AtLeastOnceReceiveStrategy.<Receive>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.AzureStorageQueues.MessagePump.<InnerReceive>d__7.MoveNext()

После этого он перестает обрабатывать сообщение, но оставляет его в очереди. Затем, после ожидания настроенного периода невидимости сообщения, сообщение снова становится видимым в очереди, и NServiceBus будет бесконечно повторять процесс «предупреждение и остановка обработки». Есть ли способ изменить способ, которым NServiceBus обрабатывает этот сценарий, чтобы он выбрасывал сообщение в настроенную очередь ошибок, когда он не может анализировать информацию заголовка и не пытаться обрабатывать сообщение бесконечно?


person Beau    schedule 06.02.2020    source источник


Ответы (1)


Транспорт NServiceBus Storage Queues предполагает, что сообщения поступают с правильным конвертом. Если этот конверт не найден, вы получите исключение, которое вы видите выше. Сообщения, созданные не с помощью NServiceBus или с пользовательским конвертом, см. в документации здесь. Короче говоря, вам нужен пользовательский распаковщик конвертов.

Пользовательский распаковщик (обратный вызов) отвечает за десериализацию сообщения и создание MessageWrapper, с которым NServiceBus должен работать.

var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();

transport.UnwrapMessagesWith(cloudQueueMessage =>
{
    using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
    using (var streamReader = new StreamReader(stream))
    using (var textReader = new JsonTextReader(streamReader))
    {
        //try deserialize to a NServiceBus envelope first
        var wrapper = jsonSerializer.Deserialize<MessageWrapper>(textReader);

        if (wrapper.Id != null)
        {
            //this was a envelope message
            return wrapper;
        }

        //this was a native message just return the body as is with no headers
        return new MessageWrapper
        {
            Id = cloudQueueMessage.Id,
            Headers = new Dictionary<string, string>(),
            Body = cloudQueueMessage.AsBytes
        };
    }
});
person Sean Feldman    schedule 06.02.2020