Мне нужно получить несколько файлов параллельно. Сама операция get требует большого количества операций ввода-вывода и может значительно выиграть от параллельного выполнения.
С RxJava я смог добиться этого, заключив свою функцию в Async.toAsync
.
Мне было интересно, есть ли более чистый способ использования subscribeOn ( ) или monitoringOn () ? Я не мог этого понять. Пробовали разные способы, но в любом случае использовался только один поток, и обработка происходила последовательно.
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.util.async.Async;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ParallelMultiGet {
public List<String> readContents(List<String> paths) {
Func1<String, String> getFunction = new Func1<String, String>() {
@Override
public String call(String path) {
return get(path);
}
};
Scheduler scheduler = Schedulers.from(
Executors.newFixedThreadPool(
Math.min(paths.size(), 50)));
Future<List<String>> result = Observable
.from(paths)
.flatMap(
Async.toAsync(getFunction, scheduler))
.toList()
.toBlocking()
.toFuture();
try {
return result.get(30, TimeUnit.MINUTES);
} catch (Exception e) {
// For example if Func1.call above throws an exception it ends up in here
throw new IllegalStateException("Couldn't read paths", e);
}
}
private String get(String path) {
// this would be the slow operation, waiting for IO
return "content";
}
}
Даже это уже неплохо, потому что мне не нужно создавать свои собственные циклы, которые отправляют фьючерсы и объединяют их значения в список результатов. Но, может быть, это не должно быть так многословно?
ExecutorService
имеет.invokeAll()
- person fge   schedule 05.12.2014invokeAll()
, но этот вопрос в первую очередь о том, как это можно сделать с помощью RxJava. Было бы неплохо найти удобный способ объединения методов, а также иметь возможность использоватьObservable.from
и т. Д. Вместо повторенияpaths
вручную для создания задач дляinvokeAll()
. - person juhoautio   schedule 05.12.2014