Rebus - шина с транспортом sql в обратном вызове MemoryCache

У меня есть обработчик сообщений, который накапливает сообщения в MemoryCache за заданное время, так что будет обрабатываться только последнее.

Когда происходит обратный вызов, я хочу перенаправить другое сообщение обработчику с использованием транспорта sql, но соединение sql теперь закрыто.

Код выглядит примерно так:

public IBus SqlBus { get; set; }

public async Task Handle(ServiceMessage message)
{
    await base.Handle(() =>
    {
        cache.Set(CacheKey, message, new CacheItemPolicy()
        {
            AbsoluteExpiration = DateTimeOffset.Now.AddSeconds(10),
            RemovedCallback = new CacheEntryRemovedCallback(CacheCallback),
        });

        return Task.FromResult(0);
    }, message);
}

private void CacheCallback(CacheEntryRemovedArguments arguments)
{
    if (arguments.RemovedReason == CacheEntryRemovedReason.Expired)
    {
        var message = arguments.CacheItem.Value as ServiceMessage;
        SqlBus.Send(new AnotherMessage()).GetAwaiter().GetResult();
    }
}

Есть ли подходы, которые позволят мне это сделать?


person Daniel    schedule 19.01.2017    source источник


Ответы (1)


Когда вызывается метод CacheCallback и в каком потоке?

Мне кажется, проблема в том, что поток, вызывающий CacheCallback, имеет значение в AmbientTransactionContext.Current, где Rebus подключает операции очереди, когда это возможно.

Если контекст транзакции каким-то образом сохранился, даже если обработчик завершил выполнение, то связанные кэшированные элементы (например, SqlConnection и SqlTransaction, связанные с транспортом SQL) будут закрыты.

person mookid8000    schedule 19.01.2017
comment
CacheCallback вызывается, когда истекает срок действия или когда другое сообщение вставляется в System.Runtime.Caching.MemoryCache. Он вызывается в другом потоке. - person Daniel; 19.01.2017
comment
Рабочий поток, я не знаю внутренней работы MemoryCache: S - person Daniel; 19.01.2017