Почему IEnumerable.ToObservable такой медленный?

Я пытаюсь перечислить большой IEnumerable один раз и наблюдайте за перечислением с различными присоединенными операторами (Count, Sum, Average и т. д.). Очевидный способ — преобразовать его в IObservable с помощью метода ToObservable, а затем подпишите на него наблюдателя. Я заметил, что это намного медленнее, чем другие методы, такие как выполнение простого цикла и уведомление наблюдателя о каждой итерации или использование метода Observable.Create вместо ToObservable. Разница существенная: в 20-30 раз медленнее. Так оно и есть, или я что-то не так делаю?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Выход:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0, C# 8, System.Reactive 4.3.2, Windows 10, консольное приложение, встроенная версия


Обновление: вот пример фактической функциональности, которую я хочу реализовать:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

Выход:

Количество: 10 000 000, Сумма: 49 999 995 000 000, Среднее значение: 4 999 999,5

Важное отличие этого подхода от использования стандартного LINQ заключается в том, что перечисляемый источник перечисляется только один раз.


Еще одно наблюдение: использование ToObservable(Scheduler.Immediate) немного быстрее (около 20%), чем ToObservable().


person Theodor Zoulias    schedule 02.04.2020    source источник
comment
Однократное измерение не слишком надежно. Рассмотрите возможность настройки эталонного показателя с помощью, например, BenchmarkDotNet. (Не аффилированный)   -  person Fildor    schedule 02.04.2020
comment
@Fildor BenchmarkDotNet более полезен для микробенчмарков или для измерения небольших различий, которые возникают с высокой изменчивостью. В этом случае, вероятно, достаточно Stopwatch.   -  person Theodor Zoulias    schedule 02.04.2020
comment
@TheodorZoulias Это еще не все, например, я бы поставил под сомнение ваш тест в его нынешнем виде, поскольку порядок выполнения в этом отдельном прогоне может вызывать большие различия.   -  person Oliver    schedule 02.04.2020
comment
Секундомера может быть достаточно, если вы собрали статистику. Не один образец.   -  person Fildor    schedule 02.04.2020
comment
@Fildor Я только что снова провел тест в обратном порядке: Method3(COUNT); Method2(COUNT); Method1(COUNT);. Я получил аналогичные результаты.   -  person Theodor Zoulias    schedule 02.04.2020
comment
Я думаю, что инициализацию перечислимого следует вынести за пределы функций ради точности.   -  person Eldar    schedule 02.04.2020
comment
Я не говорю, что вы увидите совершенно другой результат. Просто будет надежнее.   -  person Fildor    schedule 02.04.2020
comment
@Eldar Init находится за пределами измерения. (Если вы про строку var source = Enumerable.Range(0, count);)   -  person Fildor    schedule 02.04.2020
comment
@Эльдар, я только что проверил твое предложение. Я передал одно и то же перечисление всем трем методам. Результаты аналогичны.   -  person Theodor Zoulias    schedule 02.04.2020
comment
@Fildor - результаты правильные, и они ожидаемы.   -  person Enigmativity    schedule 02.04.2020
comment
@Enigmativity Я не говорю, что они НЕПРАВИЛЬНЫ. Я просто проверю достойный тест, чтобы быть уверенным.   -  person Fildor    schedule 02.04.2020
comment
@Fildor Меня не интересует, будет ли ToObservable ровно в 24,8 или 25,2 раза медленнее. Это не имеет никакого значения для моего варианта использования. В обоих случаях я склонен не использовать его, а вместо этого использовать один из других методов.   -  person Theodor Zoulias    schedule 02.04.2020
comment
@Fildor - достаточно честно. Я имею в виду, что цифры отражают то, чего следует ожидать.   -  person Enigmativity    schedule 02.04.2020
comment
@TheodorZoulias - Было бы ошибкой не использовать .ToObservable() по причинам, которые я изложил в своем ответе. Скорость здесь не цель.   -  person Enigmativity    schedule 02.04.2020
comment
@Enigmativity Согласен.   -  person Fildor    schedule 02.04.2020
comment
@TheodorZoulias - Хороший вопрос, кстати.   -  person Enigmativity    schedule 02.04.2020
comment
@TheodorZoulias - Это также одна из причин, по которой я советую избегать Observable.Create, особенно если вы в конечном итоге делаете return Disposable.Empty;.   -  person Enigmativity    schedule 02.04.2020


Ответы (2)


В этом разница между хорошо себя зарекомендовавшей наблюдаемой и наблюдаемой «самокрутись, потому что ты думаешь, что чем быстрее, тем лучше, но это не так».

Когда вы углубитесь в источник, вы обнаружите эту прекрасную строчку:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

Эффективно вызывает hasNext = enumerator.MoveNext(); один раз за запланированную рекурсивную итерацию.

Это позволяет вам выбрать планировщик для вашего .ToObservable(schedulerOfYourChoice) вызова.

С другими выбранными вами вариантами вы создали простую серию вызовов .OnNext, которые практически ничего не делают. У Method2 нет даже .Subscribe звонка.

Оба из Method2 и Method1 выполняются с использованием текущего потока и оба выполняются до завершения до завершения подписки. Они блокируют звонки. Они могут вызвать состояние гонки.

Method1 — единственная, которая хорошо ведет себя как наблюдаемая. Он асинхронный и может работать независимо от подписчика.

Имейте в виду, что наблюдаемые объекты — это коллекции, которые выполняются во времени. Обычно они имеют асинхронный источник, таймер или реакцию на внешний стимул. Они не часто убегают от простого перечисления. Если вы работаете с перечислимым, следует ожидать, что синхронная работа будет работать быстрее.

Скорость не является целью Rx. Целью является выполнение сложных запросов к временным, отправленным значениям.

person Enigmativity    schedule 02.04.2020
comment
сворачивайте сами, потому что вы думаете, что чем быстрее, тем лучше, но это не так - отлично!! - person Fildor; 02.04.2020
comment
Спасибо Enigmativity за подробный ответ! Я обновил свой вопрос примером того, чего я на самом деле хочу достичь, а именно синхронного расчета по своей природе. Считаете ли вы, что вместо расширений Reactive мне следует искать другой инструмент, учитывая, что производительность критична в моем случае? - person Theodor Zoulias; 02.04.2020
comment
@TheodorZoulias - вот множество способов сделать ваш пример в вашем вопросе: source.Aggregate(new { count = 0, sum = 0L }, (a, x) => new { count = a.count + 1, sum = a.sum + x }, a => new { a.count, a.sum, average = (double)a.sum / a.count }). Только одна итерация и более чем в 10 раз быстрее, чем Rx. - person Enigmativity; 02.04.2020
comment
Я только что протестировал его, и он действительно быстрее, но только примерно в 2 раза быстрее (по сравнению с RX без ToObservable). Это другая крайность, когда у меня лучшая производительность, но я вынужден заново реализовывать каждый оператор LINQ внутри сложного лямбда-выражения. Он подвержен ошибкам и менее удобен в сопровождении, учитывая, что в моих реальных расчетах задействовано еще больше операторов и их комбинаций. Я думаю, что довольно заманчиво платить цену производительности x2 за ясное и читаемое решение. С другой стороны платить х10 или х20 не так уж и много! - person Theodor Zoulias; 02.04.2020
comment
Возможно, если бы вы опубликовали именно то, что пытаетесь сделать, я мог бы предложить альтернативу? - person Enigmativity; 02.04.2020
comment
TBH, мой реальный случай пока находится в области воображения. ???? Я пытаюсь понять, какие инструменты больше подходят для разных сценариев, и требование одиночного перечисления возникает довольно часто. - person Theodor Zoulias; 02.04.2020
comment
@TheodorZoulias - Имейте в виду, что способ Rx с одной итерацией вызывает несколько взаимодействий субъектов. Вы вообще не удалили ни одной итерации на практике. - person Enigmativity; 02.04.2020
comment
Не могли бы вы уточнить это? Насколько я понимаю, Subject — это просто список подписчиков (наблюдателей), и каждый раз, когда он получает уведомление, он распространяет его среди своих подписчиков. Это больше, чем это? - person Theodor Zoulias; 02.04.2020
comment
@TheodorZoulias - Нет, это именно так. Скажем, вы делаете 4 вычисления, чтобы вы могли выполнить перечисление 4 раза, чтобы сделать это. Или вы можете прикрепить 4 наблюдателя к предмету и повторить перечисление один раз. В любом случае это n * 4 значений, через которые вы проходите - n раз 4наблюдателей или n раз 4 итераций. - person Enigmativity; 02.04.2020
comment
Я хочу избежать перечисления источника более одного раза, потому что данные могут не храниться в памяти. Вместо этого они могут быть извлечены по одному из файловой системы или базы данных. В этом случае каждое перечисление может даже давать разные результаты, что делает мои вычисленные значения несогласованными. - person Theodor Zoulias; 02.04.2020
comment
@TheodorZoulias - Тогда наблюдаемый подход хорош. Это эфемерный способ перебора списка элементов. Вы можете легко создать наблюдаемое непосредственно из File.ReadLines или запроса к БД и никогда не иметь значений в перечислимом в первую очередь. - person Enigmativity; 02.04.2020
comment
Да, метод File.ReadLines Я имел в виду. Однако он возвращает IEnumerable<string>. Итак, мы вернулись к вопросу о том, как эффективно сделать IObservable из IEnumerable. :-) - person Theodor Zoulias; 02.04.2020
comment
@TheodorZoulias. Попробуйте этот способ чтения файла с чистыми объектами наблюдения: dotnetfiddle.net/PB9w9W. - person Enigmativity; 03.04.2020
comment
Я попытался протестировать ваше решение Using-Defer-Repeat-TakeWhile и столкнулся с странное поведение. - person Theodor Zoulias; 03.04.2020
comment
Я только что проверил это после добавления ObserveOn(Scheduler.CurrentThread) в качестве обходного пути для вышеупомянутого странного поведения, и результаты не обещают . У него в пять раз больше накладных расходов, чем у ToObservable, который и без того медленный. - person Theodor Zoulias; 03.04.2020

Потому что Тема ничего не делает.

Похоже, что производительность оператора цикла различна для двух случаев:

for(int i=0;i<1000000;i++)
    total++;

or

for(int i=0;i<1000000;i++)
    DoHeavyJob();

Если использовать другой Subject с медленной реализацией OnNext, результат будет более приемлемым.

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 100;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    class My_Slow_Subject : SubjectBase<int>
    {

        public override void OnNext(int value)
        {
            //do a job which spend 3ms
            System.Threading.Thread.Sleep(3);
        }


        bool _disposed;
        public override bool IsDisposed => _disposed;
        public override void Dispose() => _disposed = true;
        public override void OnCompleted() { }
        public override void OnError(Exception error) { }
        public override bool HasObservers => false;
        public override IDisposable Subscribe(IObserver<int> observer) 
                => throw new NotImplementedException();
    }

    static SubjectBase<int> CreateSubject()
    {
        return new My_Slow_Subject();
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Выход

ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec

Поддержка ToObservable System.Reactive.Concurrency.IScheduler

Это означает, что вы можете реализовать свой собственный IScheduler и решать, когда запускать каждую задачу.

Надеюсь это поможет

С Уважением

person BlazorPlus    schedule 02.04.2020
comment
Вы понимаете, что ОП прямо говорит о значениях COUNT в 100 000 раз выше? - person Fildor; 02.04.2020
comment
Спасибо BlazorPlus за ответ. Я обновил свой вопрос, добавив более реалистичный пример моего варианта использования. subject наблюдают другие операторы, выполняющие вычисления, поэтому он ничего не делает. Снижение производительности при использовании ToObservable по-прежнему существенно, потому что вычисления очень легкие. - person Theodor Zoulias; 02.04.2020