Параллельные вызовы базы данных с использованием потоков Java 8 и CompletableFuture

Я хотел бы воспроизвести и распараллелить следующее поведение с потоками Java 8:

for (animal : animalList) {
        // find all other animals with the same breed
        Collection<Animal> queryResult = queryDatabase(animal.getBreed());

        if (animal.getSpecie() == cat) {
            catList.addAll(queryResult);
        } else {
            dogList.addAll(queryResult);
        }
}

Это то, что у меня есть до сих пор

final Executor queryExecutor =
        Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
                new ThreadFactory(){
                    public Thread newThread(Runnable r){
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        return t;
                    }
                });

List<CompletableFuture<Collection<Animal>>> listFutureResult =  animalList.stream()
        .map(animal -> CompletableFuture.supplyAsync(
                () -> queryDatabase(animal.getBreed()), queryExecutor))
        .collect(Collectors.toList());

List<Animal> = listFutureResult.stream()
        .map(CompletableFuture::join)
        .flatMap(subList -> subList.stream())
        .collect(Collectors.toList());

1 - Я не знаю, как разделить поток, чтобы получить 2 разных списка животных, один для кошек и один для собак.

2 - это решение выглядит разумным?


person user3768533    schedule 22.06.2016    source источник


Ответы (1)


Во-первых, рассмотрите возможность использования

List<Animal> result = animalList.parallelStream()
    .flatMap(animal -> queryDatabase(animal.getBreed()).stream())
    .collect(Collectors.toList());

даже если это не даст вам желаемого параллелизма до десяти. Простота может компенсировать это. Что касается другой части, это так же просто, как

Map<Boolean,List<Animal>> result = animalList.parallelStream()
    .flatMap(animal -> queryDatabase(animal.getBreed()).stream())
    .collect(Collectors.partitioningBy(animal -> animal.getSpecie() == cat));
List<Animal> catList = result.get(true), dogList = result.get(false);

Если у вас есть больше видов, чем просто кошки и собаки, вы можете использовать Collectors.groupingBy(Animal::getSpecie), чтобы получить карту от видов до списка животных.


Если вы настаиваете на использовании собственного пула потоков, можно улучшить несколько вещей:

Executor queryExecutor = Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
    r -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    });
List<Animal> result =  animalList.stream()
    .map(animal -> CompletableFuture.completedFuture(animal.getBreed())
        .thenApplyAsync(breed -> queryDatabase(breed), queryExecutor))
    .collect(Collectors.toList()).stream()
    .flatMap(cf -> cf.join().stream())
    .collect(Collectors.toList());

Ваш вариант supplyAsync требовал захвата реального экземпляра Animal и создания нового Supplier для каждого животного. Напротив, функция, переданная в thenApplyAsync, является инвариантной, выполняя одну и ту же операцию для каждого значения параметра. Приведенный выше код предполагает, что getBreed — дешевая операция, иначе было бы нетрудно передать экземпляр Animal в completedFuture и вместо этого выполнить getBreed() с асинхронной функцией.

.map(CompletableFuture::join) можно заменить простой цепочкой .join() внутри функции flatMap. В противном случае, если вы предпочитаете ссылки на методы, вы должны использовать их последовательно, то есть .map(CompletableFuture::join).flatMap(Collection::stream).

Конечно, этот вариант также позволяет использовать partitioningBy вместо toList.


И последнее замечание: если вы вызываете shutdown в службе-исполнителе после использования, нет необходимости помечать потоки как демон:

ExecutorService queryExecutor=Executors.newFixedThreadPool(Math.min(animalList.size(),10));
Map<Boolean,List<Animal>> result =  animalList.stream()
    .map(animal -> CompletableFuture.completedFuture(animal.getBreed())
        .thenApplyAsync(breed -> queryDatabase(breed), queryExecutor))
    .collect(Collectors.toList()).stream()
    .flatMap(cf -> cf.join().stream())
    .collect(Collectors.partitioningBy(animal -> animal.getSpecie() == cat));
List<Animal> catList = result.get(true), dogList = result.get(false);
queryExecutor.shutdown();
person Holger    schedule 23.06.2016
comment
Привет, Хольгер, спасибо за подробное и ясное объяснение :) Я пытаюсь понять разницу в поведении между моей реализацией (которая использует поток для получения списка CompletableFutures, а затем другой поток для получения значений из CompletableFutures) и вашей реализацией, которая выполняет .join() в том же потоке. Как вы думаете, вы можете дать мне краткое объяснение? Также, если я хочу сделать updateDatabase() вместо queryDatabase(), по-прежнему рекомендуется использовать .map(...) или мне следует использовать .forEach()? updateDatabase() ничего не вернет (пусто) - person user3768533; 24.06.2016
comment
Нет никакой разницы; Я просто пропустил промежуточную локальную переменную. Он по-прежнему состоит из двух операций Stream с временным List. Если вы хотите обернуть updateDatabase() в CompletableFuture, чтобы иметь возможность дождаться его завершения, map по-прежнему подходит. - person Holger; 24.06.2016
comment
Я просто снова думаю о поведении, и я немного сбит с толку. Если в потоке больше элементов, чем количество потоков, как мы можем выполнить первую часть операции с потоком? животное -> CompletableFuture.completedFuture(animal.getBreed()).thenApplyAsync(breed -> queryDatabase(breed), queryExecutor)) разве у нас не закончатся потоки перед прохождением каждого элемента? Что происходит тогда? - person user3768533; 15.07.2016
comment
Это зависит от конкретного исполнителя. Поэтому, когда мы используем Executors.newFixedThreadPool(…), мы должны учитывать его документация: «Создает пул потоков, который повторно использует фиксированное количество потоков, работающих из общей неограниченной очереди. В любой момент не более nThreads потоков будут активными задачами обработки. Если дополнительные задачи отправляются, когда все потоки активны, они будут ждать в очереди, пока поток не станет доступным». - person Holger; 15.07.2016
comment
Блестяще, спасибо. Что это означает для второй части операции .flatMap(cf -> cf.join().stream())? Он также ставится в очередь, чтобы выполняться после запуска потока? Я предполагаю, что путаница заключается в том, что кажется, что часть второго потока должна быть запущена (например, .join() ) до того, как первый поток завершится, что странно, потому что два потока разделены .collect - person user3768533; 15.07.2016