Реактивные расширения — вызов асинхронных событий и подписка на определенные потоки

У меня есть ряд модулей, использующих модель публикации/подписки RX.

Вот код регистрации события (повторяется для каждого модуля подписки):

_publisher.GetEvent<DataEvent>()                 
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool)
    .Subscribe(module.OnDataEvent);  

Издатель прост благодаря коду Хосе Романьелло:

public class EventPublisher : IEventPublisher
{
    private readonly ConcurrentDictionary<Type, object> _subjects = 
        new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>() 
    { 
        var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>()); 
        return subject.AsObservable(); 
    }
    public void Publish<TEvent>(TEvent sampleEvent)
    {
        object subject; 
        if (_subjects.TryGetValue(typeof(TEvent), out subject)) 
        { 
            ((ISubject<TEvent>)subject).OnNext(sampleEvent); 
        }
    }
}

Теперь моя проблема: как вы можете видеть выше, я использовал метод .ObserveOn(Scheduler.TaskPool) для выделения нового потока из пула для каждого модуля, для каждого события. Это потому, что у меня много событий и модулей. Проблема, конечно, в том, что события перепутаны во времени, так как некоторые события запускаются близко друг к другу, а затем в конечном итоге вызывают обратный вызов OnDataEvent в неправильном порядке (каждое событие OnDataEvent имеет отметку времени).

Есть ли простой способ использовать RX для обеспечения правильного порядка событий? Или я могу написать свой собственный планировщик, чтобы каждый модуль получал события последовательно?

События публикуются в правильной последовательности, разумеется.

Заранее спасибо.


person Gizz    schedule 08.09.2011    source источник


Ответы (2)


Попробуйте использовать эту реализацию EventPublisher:

public class EventPublisher : IEventPublisher
{
    private readonly EventLoopScheduler _scheduler = new EventLoopScheduler(); 
    private readonly Subject<object> _subject = new Subject<object>();

    public IObservable<TEvent> GetEvent<TEvent>() 
    { 
        return _subject
            .Where(o => o is TEvent)
            .Select(o => (TEvent)o)
            .ObserveOn(_scheduler);
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        _subject.OnNext(sampleEvent);
    }
}

Он использует EventLoopScheduler, чтобы гарантировать, что все события происходят по порядку и в одном и том же фоновом потоке.

Удалите ObserveOn из своей подписки, потому что, если вы наблюдаете за другим потоком, вы рискуете, что события снова будут происходить в неправильном порядке.

Это решает вашу проблему?

person Enigmativity    schedule 08.09.2011
comment
Чтобы сохранить исходный пример, вы можете иметь поток для каждого наблюдателя, выполнив наблюдение за битом и объявив планировщик в модуле подписки, где он был изначально, просто используя eventloopscheduler для каждого модуля вместо пула задач. - person ForbesLindesay; 08.09.2011
comment
Всем спасибо! Попробую и отчитаюсь! - person Gizz; 09.09.2011

Попробуйте метод синхронизации, например:

_publisher.GetEvent<DataEvent>()                 
    .Where(sde => sde.SourceName == source.Name) 
    .ObserveOn(Scheduler.TaskPool).Synchronize()
    .Subscribe(module.OnDataEvent);

Хотя я попробовал ваш сценарий с тем же кодом и обнаружил, что данные поступают последовательно и не перекрываются. Возможно, это что-то конкретное для вашего приложения.

person Ankur    schedule 08.09.2011
comment
Метод Synchronize не делает ничего, кроме гарантии того, что базовый наблюдаемый объект соответствует наблюдаемому контракту, то есть OnNext*(OnError|OnCompleted). - person Enigmativity; 08.09.2011