Как опросить состояние с помощью реактивных расширений?

Уже есть хороший вопрос об опросе базы данных с использованием Reactive (опрос базы данных с помощью Reactive Extensions)

У меня аналогичный вопрос, но с изюминкой: мне нужно передать значение из предыдущего результата в следующий запрос. В принципе, я хотел бы опросить это:

interface ResultSet<T>
{
   int? CurrentAsOfHandle {get;}
   IList<T> Results {get;}
}

Task<ResultSet<T>> GetNewResultsAsync<T>(int? previousRequestHandle);

Идея состоит в том, что это возвращает все новые элементы с момента предыдущего запроса.


  1. каждую минуту я хотел бы позвонить GetNewResultsAsync
  2. Я хотел бы передать CurrentAsOf из предыдущего вызова в качестве аргумента для параметра previousRequest
  3. следующий вызов GetNewResultsAsync должен фактически произойти через одну минуту после предыдущего

В принципе, есть ли лучший способ, чем:

return Observable.Create<IMessage>(async (observer, cancellationToken) =>
{
    int? currentVersion = null;

    while (!cancellationToken.IsCancellationRequested)
    {
        MessageResultSet messageResultSet = await ReadLatestMessagesAsync(currentVersion);

        currentVersion = messageResultSet.CurrentVersionHandle;

        foreach (IMessage resultMessage in messageResultSet.Messages)
            observer.OnNext(resultMessage);

        await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
    }
});

Также обратите внимание, что эта версия позволяет собирать messageResultSet во время ожидания следующей итерации (например, я подумал, что, возможно, я мог бы использовать Scan для передачи предыдущего объекта набора результатов в следующую итерацию).


person Mark Sowul    schedule 27.01.2017    source источник


Ответы (2)


Ваш вопрос в основном сводится к следующему: есть функция Scan с подписью:

IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
     TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator)

Но вам нужно что-то вроде

IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, 
     TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator)

...где функция-аккумулятор возвращает наблюдаемое, а функция сканирования автоматически уменьшает его для передачи следующему вызову.

Вот функциональная реализация бедняка Scan:

public static IObservable<TAccumulate> MyScan<TSource, TAccumulate>(this IObservable<TSource> source, 
    TAccumulate initialValue, Func<TAccumulate, TSource, TAccumulate> accumulator)
{
    return source
        .Publish(_source => _source
            .Take(1)
            .Select(s => accumulator(initialValue, s))
            .SelectMany(m => _source.MyScan(m, accumulator).StartWith(m))
        );
}

Учитывая это, мы можем немного изменить его, чтобы включить функцию сокращения:

public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source,
    TAccumulate initialValue, Func<TAccumulate, TSource, IObservable<TAccumulate>> accumulator)
{
    return source
        .Publish(_source => _source
            .Take(1)
            .Select(s => accumulator(initialValue, s))
            .SelectMany(async o => (await o.LastOrDefaultAsync())
                .Let(m => _source
                    .MyObservableScan(m, accumulator)
                    .StartWith(m)
                )
            )
            .Merge()
        );
}

//Wrapper to accommodate easy Task -> Observable transformations
public static IObservable<TAccumulate> MyObservableScan<TSource, TAccumulate>(this IObservable<TSource> source,
    TAccumulate initialValue, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
{
    return source.MyObservableScan(initialValue, (a, s) => Observable.FromAsync(() => accumulator(a, s)));  
}

//Function to prevent re-evaluation in functional scenarios
public static U Let<T, U>(this T t, Func<T, U> selector)
{
    return selector(t);
}

Теперь, когда у нас есть этот причудливый оператор MyObservableScan, мы можем относительно легко решить вашу проблему:

var o = Observable.Interval(TimeSpan.FromMinutes(1))
    .MyObservableScan<long, ResultSet<string>>(null, (r, _) => Methods.GetNewResultsAsync<string>(r?.CurrentAsOfHandle))

Обратите внимание, что при тестировании я заметил, что если функция аккумулятора Task/Observable занимает больше времени, чем интервалы в источнике, наблюдаемое прервется. Я не уверен, почему. Если кто-то может исправить, очень обязан.

person Shlomo    schedule 29.01.2017

С тех пор я обнаружил, что существует перегрузка Observable.Generate, которая в значительной степени помогает. Главный недостаток в том, что он не работает с async.

public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler);

Я передаю null как свое начальное состояние. Передайте x => true как мое условие (опрашивать бесконечно). Внутри iterate я выполняю фактический опрос на основе переданного состояния. Затем в timeSelector я возвращаю интервал опроса.

So:

var resultSets = Observable.Generate<ResultSet<IMessage>, IEnumerable<IMessage>>(
   //initial (empty) result
   new ResultSet<IMessage>(),

   //poll endlessly (until unsubscription)
   rs => true,

   //each polling iteration
   rs => 
   {
      //get the version from the previous result (which could be that initial result)
      int? previousVersion = rs.CurrentVersionHandle;

      //here's the problem, though: it won't work with async methods :(
      MessageResultSet messageResultSet = ReadLatestMessagesAsync(currentVersion).Result;

      return messageResultSet;
   },

   //we only care about spitting out the messages in a result set
   rs => rs.Messages, 

   //polling interval
   TimeSpan.FromMinutes(1),

   //which scheduler to run each iteration 
   TaskPoolScheduler.Default);

return resultSets
  //project the list of messages into a sequence
  .SelectMany(messageList => messageList);
person Mark Sowul    schedule 24.02.2017
comment
Еще один небольшой недостаток заключается в том, что весь набор результатов сохраняется до следующей итерации. Версия в вопросе позволяет собирать мусор для части «сообщения», поскольку нам нужна только «версия». - person Mark Sowul; 24.02.2017