У меня есть ряд модулей, использующих модель публикации/подписки RX.
Вот код регистрации события (повторяется для каждого модуля подписки):
_publisher.GetEvent<DataEvent>()
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool)
.Subscribe(module.OnDataEvent);
Издатель прост благодаря коду Хосе Романьелло:
public class EventPublisher : IEventPublisher
{
private readonly ConcurrentDictionary<Type, object> _subjects =
new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>()
{
var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>());
return subject.AsObservable();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
object subject;
if (_subjects.TryGetValue(typeof(TEvent), out subject))
{
((ISubject<TEvent>)subject).OnNext(sampleEvent);
}
}
}
Теперь моя проблема: как вы можете видеть выше, я использовал метод .ObserveOn(Scheduler.TaskPool) для выделения нового потока из пула для каждого модуля, для каждого события. Это потому, что у меня много событий и модулей. Проблема, конечно, в том, что события перепутаны во времени, так как некоторые события запускаются близко друг к другу, а затем в конечном итоге вызывают обратный вызов OnDataEvent в неправильном порядке (каждое событие OnDataEvent имеет отметку времени).
Есть ли простой способ использовать RX для обеспечения правильного порядка событий? Или я могу написать свой собственный планировщик, чтобы каждый модуль получал события последовательно?
События публикуются в правильной последовательности, разумеется.
Заранее спасибо.