Почему повторяющийся блок преобразования Enumerable to Observable

Это довольно познавательный вопрос из любопытства. Рассмотрим следующий фрагмент:

var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();

Где SubscribeDebug подписывается простой наблюдатель:

public class DebugObserver<T> : IObserver<T>
{
    public void OnCompleted()
    {
        Debug.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Debug.WriteLine("Error");
    }

    public void OnNext(T value)
    {
        Debug.WriteLine("Value: {0}", value);
    }
}

Результат этого:

Значение: 0

Значение: 1

Значение: 2

Стоимость: 3

Стоимость: 4

А потом блокирует. Может ли кто-нибудь помочь мне понять основную причину, почему это происходит и почему наблюдаемое не завершается? Я заметил, что он завершается без вызова Concat, но блокируется с ним.


person Martin Zikmund    schedule 05.04.2020    source источник
comment
Существует ли такое поведение, когда вы объединяете другую наблюдаемую, которая уже завершена?   -  person Progman    schedule 05.04.2020
comment
Ваш код создает тупик из-за используемого планировщика. Попробуйте вместо этого: .ToObservable(Scheduler.Default). Это работает с вашим кодом. Мне нужно будет потратить больше времени на это, чтобы объяснить вам причину.   -  person Enigmativity    schedule 05.04.2020
comment
@Progman - Ты на ложном пути. Каждая подписка на enumerable.ToObservable() снова запускает перечисление. Подобно foreach вызовам перечислимого, снова запустите перечисляемое. Проблема здесь в тупиковой ситуации, вызванной планировщиком Scheduler.Immediate.   -  person Enigmativity    schedule 05.04.2020
comment
Похоже, проблема не в Scheduler.Immediate, потому что, когда я передаю его в ToObservable(), оба перечисления повторяются. Однако при вызове без какой-либо реализации планировщика код блокируется.   -  person Oguz Ozgul    schedule 05.04.2020
comment
@OguzOzgul блокирует только Scheduler.CurrentThread из всех статических планировщиков. Так что я думаю, это значение по умолчанию (когда ToObservable вызывается без аргумента).   -  person Theodor Zoulias    schedule 06.04.2020


Ответы (1)


Я просмотрел источник ToObservable и выработал минимальную реализацию. Он воспроизводит поведение, которое мы наблюдаем.

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
        ToObservableEx(enumerable, CurrentThreadScheduler.Instance);

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
        Observable.Create<T>
        (
            observer =>
            {
                IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
                {
                    if (enumerator.MoveNext()) 
                    {
                        observer.OnNext(enumerator.Current);
                        inner.Schedule(enumerator, loopRec); //<-- culprit
                    }
                    else
                    {
                        observer.OnCompleted();
                    }

                    // ToObservable.cs Line 117
                    // We never allow the scheduled work to be cancelled. 
                    return Disposable.Empty;
                }

                return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
            }
        );

С этим не по пути - суть проблемы заключается в поведении CurrentThreadScheduler, который используется планировщиком по умолчанию.

Поведение CurrentThreadScheduler заключается в том, что если расписание уже запущено во время вызова Schedule, оно оказывается в очереди.

        CurrentThreadScheduler.Instance.Schedule(() =>
        {
            CurrentThreadScheduler.Instance.Schedule(() =>
                Console.WriteLine(1)
            );

            Console.WriteLine(2);
        });

Это печатает 2 1. Это поведение в очереди - наша погибель.

Когда вызывается observer.OnCompleted(), это заставляет Concat начать следующее перечисление - однако все не так, как когда мы начинали - мы все еще находимся внутри блока observer => { }, когда пытаемся запланировать следующее. Поэтому вместо немедленного выполнения следующее расписание ставится в очередь.

Теперь enumerator.MoveNext() попал в тупик. Он не может перейти к следующему элементу — MoveNext блокируется до тех пор, пока не прибудет следующий элемент, который может прибыть только по расписанию цикла ToObservable.

Но планировщик может работать только для уведомления ToEnumerable, а затем MoveNext(), который задерживается - после выхода из loopRec - чего он не может, потому что он в первую очередь заблокирован MoveNext.

Дополнение

Примерно так ToEnumerable (из GetEnumerator.cs) делает (недопустимая реализация):

    public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
    {
        var gate = new SemaphoreSlim(0);
        var queue = new ConcurrentQueue<T>();

        using(observable.Subscribe(
            value => { queue.Enqueue(value); gate.Release(); }, 
            () => gate.Release()))
        while (true)
        {
            gate.Wait(); //this is where it blocks                

            if (queue.TryDequeue(out var current))
                yield return current;
            else
                break;
        }
    }

Ожидается, что перечисления будут блокироваться до тех пор, пока не будет получен следующий элемент, и именно поэтому существует реализация стробирования. Блокирует не Enumerable.Range, а ToEnumerable.

person Asti    schedule 06.04.2020
comment
Но я реализовал пользовательский IEnumerable‹int› и вернул пользовательский IEnumerator‹int›, и я вижу, что когда итерация по первому перечислителю завершается, снова вызывается GetEnumerator() и (я возвращаю новый), но MoveNext() никогда не вызывается. - person Oguz Ozgul; 06.04.2020
comment
Я должен уточнить - важен не ваш собственный IEnumerable, а тот, который вернул Observable.ToEnumerable(). Вот и блокирует. - person Asti; 06.04.2020
comment
Хорошо, тогда все. (Примечание: +1 уже перед моим комментарием), я изучал тот же конструктор, используя !! ильдасм !! ослеп и сдался. Надо было скачать рефлектор :) - person Oguz Ozgul; 06.04.2020
comment
О, и источник, конечно же, на GitHub.. Бедный я. - person Oguz Ozgul; 06.04.2020
comment
Ха-ха. Я обновил свой ответ. Я написал ToEnumerable, но он был в основном нефункциональным - просто иллюстративный, поэтому я решил не публиковать. - person Asti; 06.04.2020
comment
Это превосходный ответ! Дает много информации о том, как наблюдаемые объекты работают в синхронной (однопоточной) среде. - person Theodor Zoulias; 06.04.2020
comment
@TheodorZoulias Спасибо! Проблема стала намного яснее, когда я реализовал ToObservable - трассировки стека Rx довольно нечитаемы. Написание упрощенных Rx-операторов для понимания их поведения — хороший опыт обучения. :) - person Asti; 06.04.2020
comment
Невероятный! Спасибо за такой подробный ответ! - person Martin Zikmund; 06.04.2020
comment
Потрясающий ответ! - person Enigmativity; 07.04.2020
comment
Должен признаться, я немного озадачен тем, что планировщик по умолчанию, участвующий в этих операциях, небезопасен. Это то, что нужно исправить в библиотеке, или есть какие-то рекомендации, чтобы избежать подобных взаимоблокировок? - person NickL; 10.04.2020
comment
@NickL С текущими настройками по умолчанию библиотека довольно производительна. Это утечка абстракций параллелизма Rx, поэтому нам пришлось вернуться к источнику. В большинстве случаев использования вы никогда не столкнетесь с этими пограничными случаями в идиоматическом Rx. - person Asti; 16.04.2020