Как я могу создать класс, который одновременно является Task‹T› и IObservable‹T›?

Недавно я столкнулся с ситуацией, когда было бы выгодно иметь асинхронную операцию, представленную как Task<T>, так и IObservable<T>. Представление задачи поддерживает состояние операции (IsCompleted, IsFaulted и т. д.), в то время как наблюдаемое представление позволяет интересным образом комбинировать несколько операций (Concat, Merge, Switch и т. д.), автоматически обрабатывая отмену любой операции, подписка на которую была отменена. кстати, решая таким образом проблему незабытых асинхронных операций. Поэтому меня заинтересовали способы объединения этих двух представлений.

Простым и, вероятно, правильным способом их объединения была бы композиция: создание типа, который хранит внутри Task<T> и IObservable<T> и предоставляет их как два своих свойства. Но в этом вопросе меня интересует сложная и, вероятно, непрактичная возможность типа, который является Task<T> и является IObservable<T> одновременно. Тип, который можно передать напрямую API-интерфейсам, которые принимают либо задачи, либо наблюдаемые объекты, и в любом случае делают правильные вещи. Так что это не может быть просто объект, похожий на задачу. Он должен наследоваться от реального объекта, самого класса Task<T>. Что-то вроде этого:

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
    {
        //...
    }
}

Создание экземпляра AsyncOperation должно немедленно вызвать указанное действие. Другими словами, AsyncOperation должно представлять комбинацию горячих задач/наблюдаемых.

Можно ли создать такой тип?

Кстати, вот тема в библиотеке ReactiveX/RxJava, которая доказывает, что другие уже думали об этой проблеме раньше: Нет методов isCompleted или isErrored в Observable


person Theodor Zoulias    schedule 14.11.2020    source источник
comment
Пожалуйста, покажите свои попытки, которые вы пробовали, и проблемы/сообщения об ошибках, которые вы получаете в результате ваших попыток. Расширение с Task<T> не выглядит проблематичным (это не sealed), но, может быть, вы можете написать, в чем проблема (желательно с MCVE).   -  person Progman    schedule 14.11.2020
comment
@Progman проблема в том, что все общедоступные конструкторы Task<T> создают задачи делегирования, а асинхронный метод создает задачу в стиле обещания. Конструкторы, связанные с задачами в стиле обещаний, являются частными или внутренними.   -  person Theodor Zoulias    schedule 14.11.2020
comment
Позвольте мне (цитировать][docs.microsoft.com/en-us/previous-versions/dotnet/ документация по RX. Вам не нужно самостоятельно реализовывать интерфейсы IObservable‹T›/IObserver‹T›. Rx обеспечивает внутреннюю реализацию этих интерфейсов для вас и предоставляет их через различные методы расширения, предоставляемые типами Observable и Observer.   -  person Filip Cordas    schedule 15.11.2020
comment
@FilipCordas конечно. Моя проблема в том, что встроенные реализации интерфейса IObservable<T>, например возвращаемое значение метода Observable.StartAsync, не наследуются от класса Task<T>. Поэтому я не могу использовать функциональность задачи для наблюдаемого, представляющего одну асинхронную операцию. И этот функционал нужен в некоторых сценариях. Например, поток выполнения может зависеть от того, успешно ли завершена операция или нет.   -  person Theodor Zoulias    schedule 15.11.2020
comment
@TheodorZoulias Опять же, я не вижу необходимости в этом ни в какой ситуации. Я никогда не слышал о реальной необходимости наследовать от Task‹T›, я был удивлен, что это не было запечатано. Вы можете создать задачу из наблюдаемого, а можете создать наблюдаемое из задачи. Таким образом, швы будут несуществующей проблемой.   -  person Filip Cordas    schedule 16.11.2020
comment
@FilipCordas да, вы, безусловно, можете создать наблюдаемую и задачу, которые представляют одну и ту же асинхронную операцию, а затем должны носить их обе с собой, чтобы каждый раз использовать любое представление, которое вам нужно. Это то, что я сделал в своем ответе на это вопрос, и я не очень доволен его внешним видом.   -  person Theodor Zoulias    schedule 16.11.2020
comment
@FilipCordas К вашему сведению, оператор Observable.StartAsync вызывает внутри метод расширения Task.ToObservable, который возвращает упакованный экземпляр внутреннего SlowTaskObservable. Этот класс хранит базовую задачу, созданную при вызове Observable.StartAsync, но у меня нет никакого способа получить эту задачу (кроме использования приемов отражения). Моя проблема не существовала бы, если бы эта задача была доступна!   -  person Theodor Zoulias    schedule 16.11.2020


Ответы (1)


Я нашел способ создать наблюдаемую, которая наследуется от Task, используя гениальную технику, описанную @GlennSlayden в этот ответ.

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    private readonly IObservable<TResult> _observable;
    private readonly Task<TResult> _promise;

    private AsyncOperation(Func<TResult> function) : base(() => function())
        => function = this.GetResult;

    private TResult GetResult() => _promise.GetAwaiter().GetResult();

    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
        : this((Func<TResult>)null)
    {
        _observable = Observable.StartAsync(action, Scheduler.Immediate);
        _promise = _observable.ToTask();
        _promise.ContinueWith(_ => base.RunSynchronously(), default,
            TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
        => _observable.Subscribe(observer);
}

Приведенное выше решение не является совершенным, поскольку экземпляр производного класса никогда не сможет перейти в Canceled состояние. Это проблема, которую я не знаю, как исправить, и, возможно, она не поправима, но, вероятно, это не очень важно. Отмена появляется как TaskCanceledException, и обработка этого исключения в любом случае является нормальным способом работы с отмененными задачами.

Интересно, что асинхронную операцию можно отменить, создав фиктивную подписку и удалив ее:

var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });

operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken

Я немного поэкспериментировал с этим классом и обнаружил, что он менее практичен, чем я думал изначально. Проблема в том, что существует множество API, которые поддерживают как задачи, так и наблюдаемые, и в остальном идентичны (например, Concat, Merge, Switch, Wait и т. д.). Это приводит к частому появлению ошибок времени компиляции (CS0121 неоднозначный вызов). Разрешение двусмысленности возможно путем приведения типа либо к задаче, либо к наблюдаемому, но это неудобно и в первую очередь сводит на нет всю цель объединения двух типов.


Пояснение. Строка _promise.GetAwaiter().GetResult() может на первый взгляд указывать на то, что данная реализация блокирует поток ThreadPool. Это не так, потому что база Task изначально холодная и нагревается только после завершения _promise.

person Theodor Zoulias    schedule 16.11.2020