Исполнитель ForkJoinPool по умолчанию занимает много времени

Я работаю с CompletableFuture для асинхронного выполнения потока, созданного из источника списка.

поэтому я тестирую перегруженный метод, то есть SupplyAsync CompletableFuture, в котором один метод принимает только один параметр поставщика, а другой принимает параметр поставщика и параметр исполнителя. Вот документация для обоих:

один

SupplyAsync (поставщик-поставщик)

Возвращает новый CompletableFuture, который асинхронно завершается задачей, запущенной в ForkJoinPool.commonPool(), со значением, полученным путем вызова данного Supplier.

второй

SupplyAsync (поставщик-поставщик, исполнитель-исполнитель)

Возвращает новый CompletableFuture, который асинхронно завершается задачей, запущенной в данном исполнителе, со значением, полученным путем вызова данного Supplier.

И вот мой тестовый класс:

public class TestCompleteableAndParallelStream {

    public static void main(String[] args) {
        List<MyTask> tasks = IntStream.range(0, 10)
                .mapToObj(i -> new MyTask(1))
                .collect(Collectors.toList());
        
        useCompletableFuture(tasks);
        
        useCompletableFutureWithExecutor(tasks);

    }
    
    public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
          long start = System.nanoTime();
          ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
                   .collect(Collectors.toList());
         
          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
          executor.shutdown();
        }
    
    public static void useCompletableFuture(List<MyTask> tasks) {
          long start = System.nanoTime();
          List<CompletableFuture<Integer>> futures =
              tasks.stream()
                   .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
                   .collect(Collectors.toList());
         
          List<Integer> result =
              futures.stream()
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());
          long duration = (System.nanoTime() - start) / 1_000_000;
          System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
          System.out.println(result);
        }
    
    

}


class MyTask {
      private final int duration;
      public MyTask(int duration) {
        this.duration = duration;
      }
      public int calculate() {
        System.out.println(Thread.currentThread().getName());
        try {
          Thread.sleep(duration * 1000);
        } catch (final InterruptedException e) {
          throw new RuntimeException(e);
        }
        return duration;
      }
    }

в то время как метод useCompletableFuture занимает около 4 секунд, метод useCompletableFutureWithExecutor занимает всего 1 секунду.

Нет, мой вопрос: какую другую обработку выполняет ForkJoinPool.commonPool(), которая может выполнять накладные расходы? Разве мы не должны всегда отдавать предпочтение пользовательскому пулу исполнителей, а не ForkJoinPool?




Ответы (2)


Проверьте ForkJoinPool.commonPool() размер. По умолчанию создается пул размером

Runtime.getRuntime().availableProcessors() - 1

Я запускаю ваш пример на своем Intel i7-4800MQ (4 ядра + 4 виртуальных ядра), и размер общего пула в моем случае равен 7, поэтому все вычисления заняли ~ 2000 мс:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Во втором случае вы использовали

Executors.newFixedThreadPool(Math.min(tasks.size(), 10));

поэтому в пуле есть 10 потоков, готовых к выполнению вычислений, поэтому все задачи выполняются за ~1000 мс:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Разница между ForkJoinPool и ExecutorService

Евгений в своем комментарии упомянул еще одну важную вещь. ForkJoinPool использует метод кражи работы:

ForkJoinPool отличается от других видов ExecutorService в основном за счет использования кражи работы: все потоки в пуле пытаются найти и выполнить задачи, отправленные в пул и/или созданные другими активными задачами (в конечном итоге блокируя ожидание работы, если таковой не существует). Это обеспечивает эффективную обработку, когда большинство задач порождают другие подзадачи (как это происходит с большинством ForkJoinTasks), а также когда множество небольших задач отправляется в пул от внешних клиентов. ForkJoinPools, особенно при установке для asyncMode значения true в конструкторах, также может подходить для использования с задачами в стиле событий, которые никогда не объединяются.

в то время как ExecutorService создан с помощью .newFixedThreadPool()< /a> использует подход «разделяй и властвуй».

Как определить размер пула?

Был вопрос о том, какой лучший размер пула потоков, вы можете найти полезную информацию там:

Установка идеального размера пула потоков

Также эта тема является хорошим местом для расследования:

Пользовательский пул потоков в параллельном потоке Java 8

person Szymon Stepniak    schedule 02.08.2017
comment
@SzymonStepniak вам действительно нужно хорошо разогреть виртуальную машину, чтобы сделать какие-либо разумные выводы о скорости, а этот код этого не делает. Также плохо иметь больше потоков, чем фактические процессоры (даже виртуальные). - person Eugene; 02.08.2017
comment
Не могу не согласиться. Мой ответ ограничивается только объяснением того, почему KayV увидел разницу в поведении commonPool по сравнению с пулом фиксированного размера из 10 потоков. Я бы не советовал использовать больше потоков, чем количество процессоров. - person Szymon Stepniak; 02.08.2017
comment
@SzymonStepniak также основная идея того, как работают эти пулы, совершенно различна - один разделяет работу (разделяй и властвуй), а другой ворует работу - совершенно разные реализации - person Eugene; 02.08.2017
comment
Следует подчеркнуть, что использование большего количества потоков, чем количество ядер, очень помогает в этом надуманном примере, где задание состоит из Thread.sleep. Для реальных задач параллелизм общего пула по умолчанию может быть более разумным. Кража работы здесь не имеет значения, так как, в конце концов, рабочие потоки в любом случае просто обрабатывают поставленные в очередь задачи. Ни один из них не использует здесь принцип «разделяй и властвуй». - person Holger; 02.08.2017
comment
@Holger Я имел в виду общий подход ForkJoinPool против Executor, а не только здесь. И на самом деле я только читал об этом и поэтому помню термины здесь: stackoverflow.com/questions/7926864/ - person Eugene; 02.08.2017
comment
@Eugene: Я предлагаю вам перечитать там сообщения. Во-первых, «воровство работы» и «разделяй и властвуй» не противоречат друг другу, на самом деле, оба они обычно относятся к модели F/J. Поскольку другие исполнители пула потоков, такие как созданный с помощью newFixedThreadPool(), имеют только одну очередь, они всегда выполняют своего рода «кражу работы», хотя не имеет особого смысла называть это так, поскольку вам нужны локальные очереди для назвать это фактическим воровством. С другой стороны, «разделяй и властвуй» — это стратегия решения проблем, которую вы можете реализовать на любом исполнителе, но предпочтительно на F/J. - person Holger; 03.08.2017
comment
@Holger Я перечитаю это; действительно кажется, что я что-то пропустил - person Eugene; 03.08.2017

Дальнейшая проверка решений в Интернете показала, что мы можем изменить размер пула по умолчанию, который принимает ForkJoinPool, используя следующие свойства:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

Таким образом, это свойство может дополнительно помочь в более эффективном использовании ForkJoinPool и с большим параллелизмом.

person KayV    schedule 02.08.2017
comment
КайВ, хорошая мысль. Это было подробно объяснено здесь stackoverflow.com/a/21172732/2194470 - person Szymon Stepniak; 02.08.2017