Как использовать расширение Rx.Nex ForEachAsync с асинхронным действием

У меня есть код, который передает данные из SQL и записывает их в другое хранилище. Код примерно такой:

using (var cmd = new SqlCommand("select * from MyTable", connection))
{
     using (var reader = await cmd.ExecuteReaderAsync())
     {
         var list = new List<MyData>();
         while (await reader.ReadAsync())
         {
             var row = GetRow(reader);
             list.Add(row);
             if (list.Count == BatchSize)
             {
                 await WriteDataAsync(list);
                 list.Clear();
             }
         }
         if (list.Count > 0)
         {
             await WriteDataAsync(list);
         }
     }
 }

Вместо этого я хотел бы использовать реактивные расширения для этой цели. В идеале код должен выглядеть так:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async batch => await WriteDataAsync(batch));

Однако похоже, что метод расширения ForEachAsync принимает только синхронные действия. Можно ли написать расширение, которое будет принимать асинхронное действие?


person Mark Shamis    schedule 28.07.2017    source источник


Ответы (4)


Можно ли написать расширение, которое будет принимать асинхронное действие?

Не напрямую.

Подписки Rx обязательно синхронны, потому что Rx — это система, основанная на push-уведомлениях. Когда элемент данных поступает, он проходит через ваш запрос, пока не достигнет окончательной подписки, которая в данном случае должна выполнить Action.

Методы await, предоставляемые Rx, await формируют последовательность саму, то есть ForEachAsync являются асинхронными с точки зрения последовательности (вы асинхронно ожидаете завершения последовательности), но подписка внутри ForEachAsync ( действие, предпринимаемое для каждого элемента), должны быть синхронными.

Чтобы выполнить синхронно-асинхронный переход в конвейере данных, вам понадобится буфер. Подписка Rx может (синхронно) добавлять в буфер в качестве производителя, в то время как асинхронный потребитель извлекает элементы и обрабатывает их. Итак, вам понадобится очередь производителя/потребителя, которая поддерживает как синхронные, так и асинхронные операции.

Различные типы блоков в потоке данных TPL могут удовлетворить эту потребность. Что-то вроде этого должно быть достаточно:

var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
  await buffer.Completion;

Обратите внимание, что обратного давления нет; как только StreamDataFromSql сможет отправить данные, они будут буферизованы и сохранены во входящей очереди ActionBlock. В зависимости от размера и типа данных это может быстро использовать много памяти.

person Stephen Cleary    schedule 29.07.2017
comment
Можете ли вы объяснить, что подписки Rx обязательно синхронны, потому что Rx — это система, основанная на push-уведомлениях? На первый взгляд я бы сказал, что это неправильно, но, возможно, я неправильно понял то, что вы говорите. - person Enigmativity; 29.07.2017
comment
@Enigmativity Я имею в виду, что встроенной автоматической системы противодавления нет. Подписка в вашем ответе, например, синхронная, а не асинхронная. Как только в вашей подписке появится первый await, с точки зрения Rx весь метод подписки уже выполнен, и вы можете начать новый. - person Stephen Cleary; 29.07.2017
comment
Справедливо. Я думаю, что мне следовало ввести вызов .ObserveOn, чтобы перенести выполнение из потока пользовательского интерфейса. И затем, в зависимости от того, использует ли WriteDataAsync какие-либо элементы пользовательского интерфейса, он может просто запустить пользовательский интерфейс и таким образом избежать проблем с асинхронностью. - person Enigmativity; 30.07.2017

Для этого правильно использовать Reactive Extensions, поэтому начните с момента создания соединения до тех пор, пока не запишете свои данные.

Вот как:

IObservable<IList<MyData>> query =
    Observable
        .Using(() => new SqlConnection(""), connection =>
            Observable
                .Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
                    Observable
                        .Using(() => cmd.ExecuteReader(), reader =>
                            Observable
                                .While(() => reader.Read(), Observable.Return(GetRow(reader))))))
        .Buffer(BatchSize);

IDisposable subscription =
    query
        .Subscribe(async list => await WriteDataAsync(list));

Я не мог проверить код, но он должен работать. Этот код предполагает, что WriteDataAsync тоже может принимать IList<MyData>. Если он просто не упадет в .ToList().

person Enigmativity    schedule 29.07.2017
comment
Subscribe не принимает Func<T,Task>, поэтому async и await на самом деле ничего не делают. - person ErikHeemskerk; 16.11.2020
comment
@ErikHeemskerk - это все еще Action<T>, и это работает с async/await. - person Enigmativity; 17.11.2020

Вот версия метода ForEachAsync, которая поддерживает асинхронные действия. Он проецирует наблюдаемый источник во вложенный IObservable<IObservable<Unit>>, содержащий асинхронные действия, а затем сглаживает его обратно в IObservable<Unit> с помощью Merge. Полученная наблюдаемая наконец преобразуется в задачу.

По умолчанию действия вызываются последовательно, но их можно вызывать одновременно, настроив необязательный аргумент maximumConcurrency.

Отмена необязательного аргумента cancellationToken приводит к немедленному завершению (отмене) возвращенного Task, возможно, до отмены текущих выполняемых действий.

Любое исключение, которое может возникнуть, распространяется через Task и вызывает отмену всех выполняемых в данный момент действий.

/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence,
/// and returns a 'Task' that represents the completion of the sequence and
/// all the asynchronous actions.
/// </summary>
public static Task ForEachAsync<TSource>(
    this IObservable<TSource> source,
    Func<TSource, CancellationToken, Task> action,
    CancellationToken cancellationToken = default,
    int maximumConcurrency = 1)
{
    // Arguments validation omitted
    return source
        .Select(item => Observable.FromAsync(ct => action(item, ct)))
        .Merge(maximumConcurrency)
        .DefaultIfEmpty()
        .ToTask(cancellationToken);
}

Пример использования:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));
person Theodor Zoulias    schedule 20.11.2020

Вот исходный код ForEachAsync и статья о методах ToEnumerable и AsObservable

Мы можем создать оболочку вокруг ForEachAsync, которая будет ожидать функцию, возвращающую задачу:

public static async Task ForEachAsync<T>( this IObservable<T> t, Func<T, Task> onNext )
{
    foreach ( var x in t.ToEnumerable() )
        await onNext( x );
}

Пример использования:

await ForEachAsync( Observable.Range(0, 10), async x => await Task.FromResult( x ) );
person Jonathan Tyson    schedule 28.07.2017
comment
Очень хороший момент. Однако, ожидая, я по существу теряю преимущества использования async в реализации WriteDataAsync. Мне интересно, поддерживается ли неблокирующий характер исходного кода. - person Mark Shamis; 29.07.2017