Разница между Rx Throttle().ObserveOn(планировщик) и Throttle(, планировщик)

У меня есть следующий код:

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50), RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

Как и ожидалось, UpdateUi() всегда будет выполняться в основном потоке. Когда я меняю код на

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50))
                                       .ObserveOn(RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

UpdateUI() будет выполняться в фоновом потоке.

Почему Throttle(...).ObserveOn(scheduler) не эквивалентно Throttle(..., scheduler)?


person larsmoa    schedule 02.03.2015    source источник


Ответы (2)


После некоторого расследования я считаю, что это вызвано другой версией Rx, используемой во время выполнения, чем я ожидаю (я разрабатываю плагин для стороннего приложения).

Я не уверен, почему, но кажется, что RxApp.MainThreadScheduler по умолчанию неправильно инициализируется. Экземпляром по умолчанию является WaitForDispatcherScheduler (источник). Все функции этого класса полагаются на attemptToCreateScheduler:

    IScheduler attemptToCreateScheduler()
    {
        if (_innerScheduler != null) return _innerScheduler;
        try {
            _innerScheduler = _schedulerFactory();
            return _innerScheduler;
        } catch (Exception) {
            // NB: Dispatcher's not ready yet. Keep using CurrentThread
            return CurrentThreadScheduler.Instance;
        }
    }

Что, кажется, происходит в моем случае, так это то, что _schedulerFactory() выбрасывает, в результате чего вместо этого возвращается CurrentThreadScheduler.Instance.

При ручной инициализации поведение от RxApp.MainThreadScheduler до new SynchronizationContextScheduler(SynchronizationContext.Current) соответствует ожидаемому.

person larsmoa    schedule 03.03.2015

В обоих приведенных вами примерах кода UpdateUi будет всегда вызываться в планировщике, указанном RxApp.MainThreadScheduler. Я могу сказать это с некоторой уверенностью, поскольку ObserveOn — это декоратор, обеспечивающий вызов OnNext обработчика подписчиков в указанном планировщике. См. здесь глубинный анализ.

Итак, это немного озадачивает. Либо RxApp.MainThreadScheduler не относится к правильному планировщику диспетчера, либо UpdateUi переходит из потока диспетчера. Первое не является беспрецедентным — см. https://github.com/reactiveui/ReactiveUI/issues/768 где другие столкнулись с этим. Я понятия не имею, в чем дело было в том случае. Возможно, @PaulBetts сможет внести свой вклад, или вы могли бы поднять вопрос на https://github.com/reactiveui/. В любом случае, я бы тщательно проверил ваши предположения, так как ожидаю, что это будет хорошо проверенная область. У вас есть полная копия?

Что касается вашего конкретного вопроса, разница между Throttle(...).ObserveOn(scheduler) и Throttle(..., scheduler) заключается в следующем:

В первом случае, когда Throttle указан без планировщика, он будет использовать планировщик платформы по умолчанию, чтобы ввести параллелизм, необходимый для запуска его таймера - в WPF это будет использовать поток пула потоков. Таким образом, все регулирование будет выполняться в фоновом потоке, и из-за следующих ObserveOn подписчику будут передаваться только выпущенные события в указанном планировщике.

В случае, когда Throttle указывает планировщик, регулирование выполняется в этом планировщике — как подавленные события, так и выпущенные события будут управляться в этом планировщике, и подписчик также будет вызываться в том же планировщике.

Так что в любом случае UpdateUi будет вызываться RxApp.MainThreadScheduler.

В большинстве случаев лучше всего регулировать события пользовательского интерфейса в диспетчере, поскольку, как правило, более затратно запускать отдельные таймеры в фоновом потоке и платить за переключение контекста, если только часть событий собирается пройти через регулирование.

Итак, просто чтобы убедиться, что вы не столкнулись с проблемой с RxApp.MainThreadScheduler, я бы попробовал явно указать планировщик или SynchronizationContext другим способом. Как это сделать, будет зависеть от платформы, на которой вы работаете — надеюсь, ObserveOnDispatcher() доступна, или используйте подходящую перегрузку ObserveOn. Существуют параметры для элементов управления, контекстов синхронизации и планировщиков, если импортированы правильные библиотеки Rx.

person James World    schedule 02.03.2015