Прерывание выполнения параллельного потока

Рассмотрим этот код:

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

tasks — это список Runnable, которые должны выполняться параллельно. Когда мы запускаем этот поток, и он начинает свое выполнение, то в зависимости от некоторых вычислений нам нужно прервать (отменить) все эти задачи.

Прерывание потока остановит только одно из действий. Как мы обращаемся с другими? или, может быть, Streams не следует использовать таким образом? или вы знаете лучшее решение?


person vach    schedule 16.06.2014    source источник
comment
Я не понимаю. Есть ExecutorService.invokeAll с понятным способом отмены созданных задач. Почему вы настаиваете на использовании чего-то, что не предназначено для предоставления такой функции? В tasks.parallelStream().forEach(Runnable::run) нет ничего более читаемого, чем в executor.invokeAll(tasks)   -  person Holger    schedule 16.06.2014
comment
Поскольку код, который запускает это, иногда должен выполнять синхронизированное выполнение задачи с использованием tasks.stream() вместо параллельного потока, единственное, что сейчас я использую потоки, это то, что они более читабельны и понятны для тех, кто просматривает код. .. Но, вероятно, вы правы, и они не предназначены для использования так, как я их использую.   -  person vach    schedule 17.06.2014
comment
Вы можете использовать однопоточный ExecutorService или даже реализовать ExecutorService с запуском на месте, что очень просто при создании подклассов AbstractExecutorService. При этом вы получаете последовательное выполнение с поддержкой отмены бесплатно…   -  person Holger    schedule 17.06.2014


Ответы (3)


На самом деле вы не запускаете Runnables в потоке, который создаете. Вы запускаете поток, который будет отправлен в пул, поэтому:

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

В этом примере вы в меньших терминах делаете

List<Runnable> tasks = ...;
Thread thread = new Thread(new Runnable(){
    public void run(){
       for(Runnable r : tasks){
          ForkJoinPool.commonPool().submit(r);
       }
    }
});

Это связано с тем, что вы используете parallelStream, который делегирует общий пул при обработке параллельных исполнений.

Насколько я знаю, вы не можете получить дескриптор потоков, выполняющих ваши задачи, с помощью parallelStream, поэтому вам может не повезти. Вы всегда можете делать хитрые вещи, чтобы получить нить, но, вероятно, это не лучшая идея.

person John Vint    schedule 16.06.2014

Вы можете использовать ForkJoinPool для прерывания потоков:

@Test
public void testInterruptParallelStream() throws Exception {
    final AtomicReference<InterruptedException> exc = new AtomicReference<>();

    final ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    // use the pool with a parallel stream to execute some tasks
    forkJoinPool.submit(() -> {
        Stream.generate(Object::new).parallel().forEach(obj -> {
            synchronized (obj) {
                try {
                    // task that is blocking
                    obj.wait();
                } catch (final InterruptedException e) {
                    exc.set(e);
                }
            }
        });
    });

    // wait until the stream got started
    Threads.sleep(500);
    // now we want to interrupt the task execution
    forkJoinPool.shutdownNow();
    // wait for the interrupt to occur
    Threads.sleep(500);
    // check that we really got an interruption in the parallel stream threads
    assertTrue(exc.get() instanceof InterruptedException);
}

Рабочие потоки действительно прерываются, завершая операцию блокировки. Вы также можете позвонить shutdown() из Consumer.

Обратите внимание, что эти сны могут быть не настроены для правильного модульного теста, у вас могут быть лучшие идеи, чтобы просто подождать по мере необходимости. Но достаточно показать, что он работает.

person benez    schedule 22.01.2018
comment
Я проверяю, что это решение работает, но кажется плохой идеей каждый раз создавать и останавливать новый пул потоков, верно? Есть ли лучшее решение? Я пытался использовать Future#cancel как этот Future‹?› task = forkJoinPool.submit(() -› { // здесь параллельный поток }); задача.отмена(правда); но это не работает почему-то - person Vitaly Tsvetkoff; 31.01.2020
comment
@VitalyTsvetkoff, если вы собираетесь прервать конкретное выполнение параллельно выполняемых задач, то вы не хотите, чтобы этот процесс был связан с какими-либо другими потоками или пулами, поэтому создание собственного ForkJoinPool - лучший способ убедиться, что у вас нет побочные эффекты. и если выполнение не занимает достаточно много времени, чтобы оправдать создание собственного пула, это может быть не очень хорошим вариантом использования параллельного потока, который должен повысить производительность. - person benez; 04.03.2021

Что-то вроде следующего должно работать для вас:

AtomicBoolean shouldCancel = new AtomicBoolean();
...
tasks.parallelStream().allMatch(task->{
    task.run();
    return !shouldCancel.get();
});

В документации к методу allMatch конкретно указано, что он не может оценивать предикат для всех элементов, если это не необходимо для определения результата. Поэтому, если предикат не совпадает, когда вы хотите отменить, ему больше не нужно оценивать. Кроме того, вы можете проверить возвращаемый результат, чтобы увидеть, был ли цикл отменен или нет.

person BrainStorm.exe    schedule 07.08.2020