Я просмотрел источник ToObservable
и выработал минимальную реализацию. Он воспроизводит поведение, которое мы наблюдаем.
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext())
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec); //<-- culprit
}
else
{
observer.OnCompleted();
}
// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
}
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);
С этим не по пути - суть проблемы заключается в поведении CurrentThreadScheduler
, который используется планировщиком по умолчанию.
Поведение CurrentThreadScheduler
заключается в том, что если расписание уже запущено во время вызова Schedule
, оно оказывается в очереди.
CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
});
Это печатает 2 1
. Это поведение в очереди - наша погибель.
Когда вызывается observer.OnCompleted()
, это заставляет Concat
начать следующее перечисление - однако все не так, как когда мы начинали - мы все еще находимся внутри блока observer => { }
, когда пытаемся запланировать следующее. Поэтому вместо немедленного выполнения следующее расписание ставится в очередь.
Теперь enumerator.MoveNext()
попал в тупик. Он не может перейти к следующему элементу — MoveNext
блокируется до тех пор, пока не прибудет следующий элемент, который может прибыть только по расписанию цикла ToObservable
.
Но планировщик может работать только для уведомления ToEnumerable
, а затем MoveNext()
, который задерживается - после выхода из loopRec
- чего он не может, потому что он в первую очередь заблокирован MoveNext
.
Дополнение
Примерно так ToEnumerable
(из GetEnumerator.cs) делает (недопустимая реализация):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
{
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();
using(observable.Subscribe(
value => { queue.Enqueue(value); gate.Release(); },
() => gate.Release()))
while (true)
{
gate.Wait(); //this is where it blocks
if (queue.TryDequeue(out var current))
yield return current;
else
break;
}
}
Ожидается, что перечисления будут блокироваться до тех пор, пока не будет получен следующий элемент, и именно поэтому существует реализация стробирования. Блокирует не Enumerable.Range
, а ToEnumerable
.
person
Asti
schedule
06.04.2020
.ToObservable(Scheduler.Default)
. Это работает с вашим кодом. Мне нужно будет потратить больше времени на это, чтобы объяснить вам причину. - person Enigmativity   schedule 05.04.2020enumerable.ToObservable()
снова запускает перечисление. Подобноforeach
вызовам перечислимого, снова запустите перечисляемое. Проблема здесь в тупиковой ситуации, вызванной планировщикомScheduler.Immediate
. - person Enigmativity   schedule 05.04.2020Scheduler.CurrentThread
из всех статических планировщиков. Так что я думаю, это значение по умолчанию (когдаToObservable
вызывается без аргумента). - person Theodor Zoulias   schedule 06.04.2020