Во-первых, рассмотрите возможность использования
List<Animal> result = animalList.parallelStream()
.flatMap(animal -> queryDatabase(animal.getBreed()).stream())
.collect(Collectors.toList());
даже если это не даст вам желаемого параллелизма до десяти. Простота может компенсировать это. Что касается другой части, это так же просто, как
Map<Boolean,List<Animal>> result = animalList.parallelStream()
.flatMap(animal -> queryDatabase(animal.getBreed()).stream())
.collect(Collectors.partitioningBy(animal -> animal.getSpecie() == cat));
List<Animal> catList = result.get(true), dogList = result.get(false);
Если у вас есть больше видов, чем просто кошки и собаки, вы можете использовать Collectors.groupingBy(Animal::getSpecie)
, чтобы получить карту от видов до списка животных.
Если вы настаиваете на использовании собственного пула потоков, можно улучшить несколько вещей:
Executor queryExecutor = Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
List<Animal> result = animalList.stream()
.map(animal -> CompletableFuture.completedFuture(animal.getBreed())
.thenApplyAsync(breed -> queryDatabase(breed), queryExecutor))
.collect(Collectors.toList()).stream()
.flatMap(cf -> cf.join().stream())
.collect(Collectors.toList());
Ваш вариант supplyAsync
требовал захвата реального экземпляра Animal
и создания нового Supplier
для каждого животного. Напротив, функция, переданная в thenApplyAsync
, является инвариантной, выполняя одну и ту же операцию для каждого значения параметра. Приведенный выше код предполагает, что getBreed
— дешевая операция, иначе было бы нетрудно передать экземпляр Animal
в completedFuture
и вместо этого выполнить getBreed()
с асинхронной функцией.
.map(CompletableFuture::join)
можно заменить простой цепочкой .join()
внутри функции flatMap
. В противном случае, если вы предпочитаете ссылки на методы, вы должны использовать их последовательно, то есть .map(CompletableFuture::join).flatMap(Collection::stream)
.
Конечно, этот вариант также позволяет использовать partitioningBy
вместо toList
.
И последнее замечание: если вы вызываете shutdown
в службе-исполнителе после использования, нет необходимости помечать потоки как демон:
ExecutorService queryExecutor=Executors.newFixedThreadPool(Math.min(animalList.size(),10));
Map<Boolean,List<Animal>> result = animalList.stream()
.map(animal -> CompletableFuture.completedFuture(animal.getBreed())
.thenApplyAsync(breed -> queryDatabase(breed), queryExecutor))
.collect(Collectors.toList()).stream()
.flatMap(cf -> cf.join().stream())
.collect(Collectors.partitioningBy(animal -> animal.getSpecie() == cat));
List<Animal> catList = result.get(true), dogList = result.get(false);
queryExecutor.shutdown();
person
Holger
schedule
23.06.2016