Какой исполнитель использует CompletableFuture.allOf?

Предположим, у нас есть два исполнителя, 1 и 2.

Мы можем настроить, какой исполнитель использовать при выполнении

CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(()-> {return 1;}, executor1) //executor1
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(()-> {return 2;}, executor1) //executor1
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(()-> {return 3;}, executor2) //executor2

но какой исполнитель потока использует статический метод allOf CompletableFuture?

CompletableFuture.allOf(cf1, cf2, cf3)

Спасибо!


person italktothewind    schedule 21.04.2018    source источник
comment
есть исполнитель по умолчанию Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();   -  person Andrew Tobilko    schedule 21.04.2018
comment
Исполнитель @Andrew по умолчанию здесь не участвует просто потому, что нет задачи для выполнения.   -  person Ivan Gammel    schedule 21.04.2018


Ответы (2)


Ответ Ивана Гаммеля не точен.

На самом деле нет исполнителя, связанного с CompletableFuture, возвращаемым allOf(), как на самом деле никогда не бывает исполнителя, связанного с каким-либо CompletableFuture.

задача связана с исполнителем, так как она выполняется внутри него, но связь обратная: у исполнителя есть список задач для выполнения.

Задачу также можно связать с CompletableFuture, которая будет завершена, когда задача завершится. Сам CompletableFuture не хранит ссылку на задачу или исполнителя, которые использовались для его создания. Однако он может хранить ссылки на задачи и, возможно, исполнители, используемые на зависимых этапах.

CompletableFuture, возвращенный allOf(), будет завершен задачей, которая является зависимой стадией исходных CompletableFutures. В вашем примере эта задача может быть выполнена:

  • executor1, если третья задача завершилась первой;
  • executor2, если 2 первых задания завершились раньше третьего; или
  • исходный поток, если все задачи были завершены до того, как вы вызвали allOf().

Это можно увидеть, добавив зависимый этап thenRun() к вызову allOf():

public class CompletableFutureAllOfCompletion {
    private ExecutorService executor1 = Executors.newFixedThreadPool(2);
    private ExecutorService executor2 = Executors.newFixedThreadPool(2);
    private Random random = new Random();

    public static void main(String[] args) {
        new CompletableFutureAllOfCompletion().run();
    }

    public void run() {
        CompletableFuture<Integer> cf1 = supplyAsync(this::randomSleepAndReturn, executor1);
        CompletableFuture<Integer> cf2 = supplyAsync(this::randomSleepAndReturn, executor1);
        CompletableFuture<Integer> cf3 = supplyAsync(this::randomSleepAndReturn, executor2);
        randomSleepAndReturn();
        CompletableFuture.allOf(cf1, cf2, cf3)
                .thenRun(() -> System.out.println("allOf() commpleted on "
                        + Thread.currentThread().getName()));

        executor1.shutdown();
        executor2.shutdown();
    }

    public int randomSleepAndReturn() {
        try {
            final long millis = random.nextInt(1000);
            System.out.println(
                    Thread.currentThread().getName() + " waiting for " + millis);
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 0;
    }
}

Некоторые возможные выходы:

Завершение на первом исполнителе (третья задача завершена первой):

pool-1-thread-1 waiting for 937
pool-1-thread-2 waiting for 631
main waiting for 776
pool-2-thread-1 waiting for 615
allOf() commpleted on pool-1-thread-1

Завершение на втором исполнителе (первая и вторая задача завершены раньше третьей):

pool-1-thread-1 waiting for 308
pool-1-thread-2 waiting for 788
main waiting for 389
pool-2-thread-1 waiting for 863
allOf() commpleted on pool-2-thread-1

Завершение в основном потоке (все задачи завершены до allOf().thenRun()):

pool-1-thread-1 waiting for 168
pool-1-thread-2 waiting for 292
main waiting for 941
pool-2-thread-1 waiting for 188
allOf() commpleted on main

Как управлять исполнителем, который будет использоваться после allOf() (или anyOf())

Поскольку нет никакой гарантии, что исполнитель будет использоваться, за вызовом одного из этих методов должен следовать вызов *Async(, executor) для управления тем, какой исполнитель будет использоваться.

Если вам нужно вернуть результат CompletableFuture одного из этих вызовов, просто добавьте thenApplyAsync(i -> i, executor) перед возвратом.

person Didier L    schedule 22.04.2018
comment
@ Анатолий К вашему сведению, ваше редактирование ничего не изменило в выделении кода (поскольку в вопросе уже есть тег Java, выделение применяется автоматически), однако оно сделало код с отступом в 4 пробела. Таким образом, я отменил это изменение. - person Didier L; 29.11.2020

Нет исполнителя, связанного с CompletableFuture#allOf, он просто создает CompletableFuture, который будет ждать завершения зависимостей в том же потоке, где вы вызовете CompletableFuture#get().

В вашем примере задачи позади cf1 и cf2 будут по-прежнему выполняться executor1, задача в cf2 будет выполняться executor2, результат allOf(..).get() будет возвращен в текущем потоке, и никакие дополнительные потоки не будут запущены за сценой.

Вот пример того, как вы можете наблюдать за реальным поведением в вашей IDE, установив точку останова в строке System.out.println и проверив список активных потоков.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

import static java.util.concurrent.CompletableFuture.supplyAsync;

public class ExecutorTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Executor executor1 = Executors.newSingleThreadExecutor();
        Executor executor2 = Executors.newSingleThreadExecutor();
        CompletableFuture<Integer> cf1 = supplyAsync(run(1), executor1); //executor1
        CompletableFuture<Integer> cf2 = supplyAsync(run(2), executor1); //executor1
        CompletableFuture<Integer> cf3 = supplyAsync(run(3), executor2); //executor2
        CompletableFuture<Void> result = CompletableFuture.allOf(cf1, cf2, cf3);
        new Thread(() -> {
            try {
                result.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }).start();
        System.out.println("Waiting now...");
    }

    private static Supplier<Integer> run(int result) {
        return () -> runDelayed(result);
    }

    private static int runDelayed(int result) {
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }

}
person Ivan Gammel    schedule 21.04.2018