Как проще всего распараллелить задачу в java?

Скажем, у меня есть такая задача:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

Каков самый простой способ распараллелить каждый compute () (при условии, что они уже распараллеливаются)?

Мне не нужен ответ, который строго соответствует приведенному выше коду, просто общий ответ. Но если вам нужна дополнительная информация: мои задачи связаны с вводом-выводом, и это для веб-приложения Spring, и задачи будут выполняться в HTTP-запросе.


person Eduardo    schedule 06.01.2010    source источник
comment
Должна ли вторая строка быть Result result = compute(object);?   -  person Carcigenicate    schedule 10.10.2015


Ответы (9)


Я бы рекомендовал взглянуть на ExecutorService < / а>.

В частности, примерно так:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Обратите внимание, что использование newCachedThreadPool может быть плохим, если objects - большой список. Кэшированный пул потоков может создавать поток для каждой задачи! Вы можете использовать newFixedThreadPool(n), где n - что-то разумное (например, количество ядер, которые у вас есть, при условии, что compute() привязан к ЦП).

Вот полный код, который действительно выполняется:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}
person overthink    schedule 06.01.2010

В Java8 и более поздних версиях вы можете использовать parallelStream в коллекции, чтобы добиться этого:

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

Примечание: порядок в списке результатов может не совпадать с порядком в списке объектов.

Подробная информация о том, как настроить правильное количество потоков, доступна в этом вопросе о переполнении стека сколько-потоков-порождено-в-параллельном потоке-в-java-8

person i000174    schedule 27.06.2018
comment
На мой взгляд, это запах кода. Вы блокируете весь другой код, используя parallelStream. В тестовом или небольшом приложении все нормально, но на большом сервере это может быть рецептом катастрофы. - person user482745; 17.10.2018
comment
Потоки предназначены для параллелизма данных, а не для параллелизма задач. См. stackoverflow.com/a/23370799/208288. - person Laird Nelson; 03.11.2018

Можно просто создать несколько потоков и получить результат.

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

РЕДАКТИРОВАТЬ: Я думаю, что другие решения круче.

person fastcodejava    schedule 06.01.2010

Вот что я использую в своих проектах:

public class ParallelTasks
{
    private final Collection<Runnable> tasks = new ArrayList<Runnable>();

    public ParallelTasks()
    {
    }

    public void add(final Runnable task)
    {
        tasks.add(task);
    }

    public void go() throws InterruptedException
    {
        final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        try
        {
            final CountDownLatch latch = new CountDownLatch(tasks.size());
            for (final Runnable task : tasks)
                threads.execute(new Runnable() {
                    public void run()
                    {
                        try
                        {
                            task.run();
                        }
                        finally
                        {
                            latch.countDown();
                        }
                    }
                });
            latch.await();
        }
        finally
        {
            threads.shutdown();
        }
    }
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

Что печатает чуть больше 2000 на моем двухъядерном корпусе.

person Jonathan Feinberg    schedule 06.01.2010

Вы можете использовать ThreadPoolExecutor. Вот пример кода: http://programmingexamples.wikidot.com/threadpoolexecutor (слишком длинный, чтобы приводить его здесь)

person David Rabinowitz    schedule 06.01.2010

Одним из вариантов является параллельный массив Fork / Join

person Michael Barker    schedule 06.01.2010

Чтобы получить более подробный ответ, прочтите Параллелизм Java на практике и используйте java.util.concurrent.

person Adam Goode    schedule 06.01.2010
comment
Это должен быть приятель - person Vino; 16.10.2018

Я собирался упомянуть о классе исполнителя. Вот пример кода, который вы бы поместили в класс исполнителя.

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
        this.callableList.add(callable);
    }

    public void clearCallables(){
        this.callableList.clear();
    }

    public void executeThreads(){
        try {
        threadLauncher.invokeAll(this.callableList);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Object[] getResult() {

        List<Future<Object>> resultList = null;
        Object[] resultArray = null;
        try {

            resultList = threadLauncher.invokeAll(this.callableList);

            resultArray = new Object[resultList.size()];

            for (int i = 0; i < resultList.size(); i++) {
                resultArray[i] = resultList.get(i).get();
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return resultArray;
    }

Затем, чтобы использовать его, вы должны вызвать класс исполнителя, чтобы заполнить и выполнить его.

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();
person mkamowski    schedule 06.01.2010
comment
Меня всегда раздражало отсутствие класса-оболочки для набора заданий. - person Alexander Torstling; 06.01.2010

Удобный способ - использовать ExecutorCompletionService.

Скажем, у вас есть следующий код (как в вашем примере):

 public static void main(String[] args) {
    List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      List<Character> result = computeLettersBefore(letter);
      list.add(result);
    }

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }

Теперь, чтобы выполнять задачи параллельно, все, что вам нужно сделать, это создать ExecutorCompletionService, поддерживаемый пулом потоков. Затем отправляйте задачи и читайте результаты. Поскольку ExecutorCompletionService использует LinkedBlockingQueue под капотом, результаты становятся доступными для получения, как только они становятся доступными (если вы запустите код, вы заметите, что порядок результатов является случайным):

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(3);
    final ExecutorCompletionService<List<Character>> completionService = new ExecutorCompletionService<>(threadPool);

    final List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      completionService.submit(() -> computeLettersBefore(letter));
    }

    // NOTE: instead over iterating over letters again number of submitted tasks can be used as a base for loop
    for (char letter : letters) {
      final List<Character> result = completionService.take().get();
      list.add(result);
    }

    threadPool.shutdownNow(); // NOTE: for safety place it inside finally block 

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }
person walkeros    schedule 22.09.2020