Обработка списка задач с интенсивным вводом-выводом параллельно с RxJava

Мне нужно получить несколько файлов параллельно. Сама операция get требует большого количества операций ввода-вывода и может значительно выиграть от параллельного выполнения.

С RxJava я смог добиться этого, заключив свою функцию в Async.toAsync.

Мне было интересно, есть ли более чистый способ использования subscribeOn ( ) или monitoringOn () ? Я не мог этого понять. Пробовали разные способы, но в любом случае использовался только один поток, и обработка происходила последовательно.

import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.util.async.Async;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ParallelMultiGet {

    public List<String> readContents(List<String> paths) {

        Func1<String, String> getFunction = new Func1<String, String>() {
            @Override
            public String call(String path) {
                return get(path);
            }
        };

        Scheduler scheduler = Schedulers.from(
                Executors.newFixedThreadPool(
                        Math.min(paths.size(), 50)));

        Future<List<String>> result = Observable
                .from(paths)
                .flatMap(
                        Async.toAsync(getFunction, scheduler))
                .toList()
                .toBlocking()
                .toFuture();
        try {
            return result.get(30, TimeUnit.MINUTES);
        } catch (Exception e) {
            // For example if Func1.call above throws an exception it ends up in here
            throw new IllegalStateException("Couldn't read paths", e);
        }
    }

    private String get(String path) {
        // this would be the slow operation, waiting for IO
        return "content";
    }

}

Даже это уже неплохо, потому что мне не нужно создавать свои собственные циклы, которые отправляют фьючерсы и объединяют их значения в список результатов. Но, может быть, это не должно быть так многословно?


person juhoautio    schedule 05.12.2014    source источник
comment
Для этого вам не нужен асинхронный режим; просто ExecutorService имеет .invokeAll()   -  person fge    schedule 05.12.2014
comment
Здесь действительно стоит упомянуть. Результатом является List ‹Future›, так что все еще нужно хотя бы повторить его и отобразить значения в список. Общая сложность меньше с invokeAll(), но этот вопрос в первую очередь о том, как это можно сделать с помощью RxJava. Было бы неплохо найти удобный способ объединения методов, а также иметь возможность использовать Observable.from и т. Д. Вместо повторения paths вручную для создания задач для invokeAll().   -  person juhoautio    schedule 05.12.2014


Ответы (1)


Чтобы быть менее многословным:

  • вы можете использовать оператор тайм-аута из RxJava
  • вы можете использовать java8 с лямбда вместо Func1 (но в вашем случае это не сильно поможет)
  • можно избежать создания собственного исполнителя

У меня есть код, который должен делать то же самое:

  public List<String> readContents(List<String> paths) {

      try {

        return Observable
                .from(paths)
                .flatMap(Async.toAsync((Func1<String, String>) this::get, Schedulers.io()))
                .toList()
                .timeout(30, TimeUnit.MINUTES)
                .toBlocking().single();
      } catch (RuntimeException ex) { // the cause will be a timeoutException
        // For example if Func1.call above throws an exception it ends up in here
        throw new IllegalStateException("Couldn't read paths", ex);
      }
    }

    private String get(String path) {
        // this would be the slow operation, waiting for IO
        return "content";
   }
person dwursteisen    schedule 05.12.2014
comment
Спасибо, почти готово. Async.toAsync по-прежнему кажется мне немного неуклюжим, но если это делается с помощью RxJava, прекрасно. Проблема с Schedulers.io() в том, что он создает бесконечное количество потоков. С другой стороны, Schedulers.computation() ограничивается 4 потоками (или это может зависеть от количества ядер). На самом деле это еще одна проблема, и было бы довольно просто расширить Schedulers, получив что-то вроде Schedulers.fixed(int size) или даже лучше Schedulers.cached(int maxSize). - person juhoautio; 05.12.2014
comment
.timeout() + .single() просто красиво. Спасибо за подсказку. - person juhoautio; 05.12.2014
comment
Вы можете использовать перегруженный flatMap (), который принимает максимальное количество одновременно подписанных наблюдаемых объектов в качестве второго параметра. - person npace; 30.09.2016