Чтение из потока с использованием Observable через FromAsyncPattern, как правильно закрыть/отменить

Нужно: долгоиграющая программа с TCP соединениями

Программа C# 4.0 (VS1010, XP) должна подключаться к хосту с использованием TCP, отправлять и получать байты, иногда правильно закрывать соединение и открывать его позже. Окружающий код написан в стиле Rx.Net Observable. Объем данных невелик, но программа должна работать непрерывно (избегайте утечки памяти, заботясь о правильном размещении ресурсов).

Текст ниже длинный, потому что я объясняю, что я искал и нашел. Теперь это работает.

Общие вопросы таковы: поскольку Rx иногда не интуитивно понятен, являются ли решения хорошими? Будет ли это надежно (скажем, сможет ли оно работать годами без проблем)?

Решение до сих пор

Отправлять

Программа получает NetworkStream следующим образом:

TcpClient tcpClient = new TcpClient();
LingerOption lingerOption = new LingerOption(false, 0); // Make sure that on call to Close(), connection is closed immediately even if some data is pending.
tcpClient.LingerState = lingerOption;

tcpClient.Connect(remoteHostPort);
return tcpClient.GetStream();

Асинхронная отправка достаточно проста. Rx.Net позволяет справиться с этим с помощью гораздо более короткого и чистого кода, чем традиционные решения. Я создал специальную тему с EventLoopScheduler. Операции, требующие отправки, выражаются с помощью IObservable. Использование ObserveOn(sendRecvThreadScheduler) гарантирует, что все операции отправки выполняются в этом потоке.

sendRecvThreadScheduler = new EventLoopScheduler(
    ts =>
    {
        var thread = new System.Threading.Thread(ts) { Name = "my send+receive thread", IsBackground = true };
        return thread;
    });
        // Loop code for sending not shown (too long and off-topic).

Пока все отлично и безупречно.

Получать

Похоже, что для получения данных Rx.Net также должен позволять более короткий и чистый код, чем традиционные решения. После прочтения нескольких ресурсов (например, http://www.introtorx.com/) и stackoverflow кажется, что очень простое решение - связать асинхронное программирование с Rx.Net, как в https://stackoverflow.com/a/14464068/1429390< /а> :

public static class Ext
{
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead,
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead,
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

В основном это работает. Я могу отправлять и получать байты.

Время закрытия

Это когда что-то начинает идти не так.

Иногда мне нужно закрыть поток и сохранить чистоту. В основном это означает: прекратить чтение, прекратить прием байтов, открыть новое соединение с новым.

Во-первых, когда соединение принудительно закрывается удаленным хостом, BeginRead()/EndRead() немедленно зацикливается, потребляя весь ЦП, возвращая нулевые байты. Я позволяю коду более высокого уровня заметить это (с Subscribe() до ReadObservable в контексте, где доступны высокоуровневые элементы) и выполнить очистку (включая закрытие и удаление потока). Это тоже хорошо работает, и я позабочусь об утилизации объекта, возвращаемого Subscribe().

    someobject.readOneStreamObservableSubscription = myobject.readOneStreamObservable.Subscribe(buf =>
    {
        if (buf.Length == 0)
        {
                MyLoggerLog("Read explicitly returned zero bytes.  Closing stream.");
                this.pscDestroyIfAny();
        }
    });

Иногда мне просто нужно закрыть поток. Но, по-видимому, это должно вызывать исключения при асинхронном чтении. c# - Правильный способ преждевременного прекращения BeginRead и BeginWrite? - Переполнение стека

Я добавил CancellationToken, который заставляет Observable.While() заканчивать последовательность. Это не очень помогает избежать этих исключений, так как BeginRead() может долго спать.

Необработанное исключение в наблюдаемом объекте привело к выходу программы. Поиск предоставил .net - Продолжить использование подписки после исключения - Stack Overflow, который предложил добавить Catch, который возобновляет неработающий Observable с пустым , эффективно.

Код выглядит так:

public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize, CancellationToken token)
{
    // to hold read data
    var buffer = new byte[bufferSize];
    // Step 1: async signature => observable factory
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
        stream.BeginRead,
        stream.EndRead);
    return Observable.While(
        // while there is data to be read
        () =>
        {
            return (!token.IsCancellationRequested) && stream.CanRead;
        },
        // iteratively invoke the observable factory, which will
        // "recreate" it such that it will start from the current
        // stream position - hence "0" for offset
        Observable.Defer(() =>
            {
                if ((!token.IsCancellationRequested) && stream.CanRead)
                {
                    return asyncRead(buffer, 0, bufferSize);
                }
                else
                {
                    return Observable.Empty<int>();
                }
            })
            .Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
        .Select(readBytes => buffer.Take(readBytes).ToArray()));
}

Что теперь? Вопрос

Похоже, это работает хорошо. Обнаружены условия, при которых удаленный хост принудительно закрыл соединение или просто больше недоступен, что приводит к тому, что код более высокого уровня закрывает соединение и повторяет попытку. Все идет нормально.

Я не уверен, что все кажется правильным.

Во-первых, эта строка:

.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

похоже на плохую практику пустого блока catch в императивном коде. Фактический код регистрирует исключение, а код более высокого уровня обнаруживает отсутствие ответа и корректно обрабатывает его, поэтому его следует считать вполне нормальным (см. ниже)?

.Catch((Func<Exception, IObservable<int>>)(ex =>
{
    MyLoggerLogException("On asynchronous read from network.", ex);
    return Observable.Empty<int>();
})) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

Кроме того, это действительно короче, чем большинство традиционных решений.

Являются ли решения правильными или я пропустил некоторые более простые/чистые способы?

Есть ли какие-то ужасные проблемы, которые кажутся очевидными мастерам реактивных расширений?

Спасибо за внимание.


person Stéphane Gourichon    schedule 18.08.2015    source источник
comment
Готовы ли вы отказаться от Rx и использовать традиционные циклы на основе ожидания для обработки данных?   -  person usr    schedule 18.08.2015
comment
Спасибо. async/await взяты из VS2012/C#4.5, поэтому краткий ответ — нет. А можно поподробнее? async/await создает локально параллельный код, который напоминает синхронный код, который является двойным обрезной. Может ли что-то вроде this быть расширено для обработки принятия новых команд в любое время из любого потока + отправка + гарантия задержки между отправкой + получением + обнаружение потерь Логика +retry+autoreconnect просто? Текущий подход Observable + цикл обработки событий требует обучения, но выглядит явным и работает. Какой-нибудь подводный камень? Спасибо.   -  person Stéphane Gourichon    schedule 19.08.2015
comment
Вы можете использовать await с .NET 4.0 и VS12. Вы можете использовать поддельное ожидание на основе итераторов с 4.0 и 2010. Такие вещи, как гарантированная задержка между отправками, действительно звучат немного похоже на Rx, но некоторая простая логика, основанная на DateTime, должна быть в состоянии сделать это. Повторная попытка и повторное подключение не являются проблемой, с которой у await возникли бы трудности. Семантически await ничего не делает. Он просто распутывает беспорядок обратного вызова и помещает его в удобочитаемую форму, где вы можете использовать обычные конструкции потока управления.   -  person usr    schedule 19.08.2015