Обработка обратного давления в FixedThreadPool

Как справиться с обратным давлением в Java с помощью пула потоков?

Как отклонить новые задачи, чтобы было отправлено не более N задач. N - максимально допустимое количество задач в очереди на отправку, включая новые, запущенные, приостановленные (не завершенные) задачи.

Вариант использования

Пользователи отправляют расчетные задачи, которые выполняются в течение некоторого времени. Иногда так много пользователей отправляют задачи одновременно. Как отклонить новые задачи, если уже отправлено N задач.

Другими словами, общее количество отправленных задач (не завершенных, начатых или не начатых) не может превышать N.

Пример кода

Вот полная версия, а ниже приведены короткие фрагменты.

Долгоиграющая задача. Расчетная задача.

public class CalculationTask {
    public CalculationTask(final String name) {
        this.name = name;
    }

    public CalculationResult calculate() {
        final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS);
        sleep(waitTimeMs);
        final int result = Math.abs(RANDOM.nextInt());
        final String text = "This is result: " + result;
        final CalculationResult calculationResult = new CalculationResult(name, text, result);
        System.out.println("Calculation finished: " + calculationResult);
        return calculationResult;
    }
}

Его результат. Результат расчета.

public class CalculationResult {

    private final String taskName;
    private final String text;
    private final Integer number;
    // Getters, setters, constructor, toString.
}

Я так отправляю вакансии. Расчетный брокер.

public class CalculationBroker {

    private static final int MAX_WORKERS_NUMBER = 5;

    private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER);
    private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>();

    public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) {
        final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName());
        if (calculationResultCached != null) {
            return CompletableFuture.completedFuture(calculationResultCached);
        }

        System.out.println("Calculation submitted: " + calculationTask.getName());

        final CompletableFuture<CalculationResult> calculated = CompletableFuture
                .supplyAsync(calculationTask::calculate, executorService);
        calculated.thenAccept(this::updateCache);
        return calculated;
    }

    private void updateCache(final CalculationResult calculationResult) {
        calculationCache.put(calculationResult.getTaskName(), calculationResult);
    }
}

И вот как я запускаю их вместе. Основной.

public class Main {

    public static void main(String[] args) {
        final int N_TASKS = 100;
        final CalculationBroker calculationBroker = new CalculationBroker();
        final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>();
        for (int i = 0; i < N_TASKS; i++) {
            final CalculationTask calculationTask = createCalculationTask(i);
            final CompletableFuture<CalculationResult> calculationResultCompletableFuture =
                    calculationBroker.submit(calculationTask);
            completableFutures.add(calculationResultCompletableFuture);
        }

        calculationBroker.close();
    }

    private static CalculationTask createCalculationTask(final int counter) {
        return new CalculationTask("CalculationTask_" + counter);
    }
}

Это выход.

2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97.
2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98.
2020-05-23 14:14:53 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99.
2020-05-23 14:14:54 [pool-1-thread-3] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066}
2020-05-23 14:14:55 [pool-1-thread-1] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885}
2020-05-23 14:14:56 [pool-1-thread-5] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120}
20

Мои выводы.

Подробнее

Код выше эквивалентен Executors.newFixedThreadPool(n), однако вместо неограниченной LinkedBlockingQueue по умолчанию мы используем ArrayBlockingQueue с фиксированной емкостью 100. Это означает, что если 100 задач уже поставлены в очередь (и n выполняются), новая задача будет отклонена с RejectedExecutionException .

ThreadPoolExecutor использует LinkedBlockingQueue, который по умолчанию не ограничен.

Как следует из сообщения выше:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);

person Yan Khonski    schedule 23.05.2020    source источник
comment
Просто чтобы прояснить ситуацию ... вы не хотите иметь возможность отправлять или вы хотите, чтобы ваши потоки не начинали обработку новых отправленных вычислений, пока у вас не будет свободного потока ?? Примечание. Оба варианта возможны только для того, чтобы предоставить правильное решение.   -  person Hussein Akar    schedule 23.05.2020
comment
Хороший вопрос, спасибо, я обновил свой вопрос. Я хочу отклонить новые задачи, поэтому общее количество отправленных задач (в очереди под капотом) меньше или равно N.   -  person Yan Khonski    schedule 23.05.2020
comment
чтобы использовать обратное давление, используйте протокол реактивных потоков.   -  person Alexei Kaigorodov    schedule 24.05.2020
comment
Хорошо бы увидеть пример, @AlexeiKaigorodov   -  person Yan Khonski    schedule 24.05.2020


Ответы (2)


Вы сами ответили на свой вопрос... для этого можно использовать размер Queue..

int poolSize = ...;
int queueSize = ...;
CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler();

ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(queueSize),
    handler);

Вы можете использовать CustomRejectedExecutionHandler для обработки отклоненных потоков.

import java.util.concurrent.ThreadPoolExecutor;

import org.apache.log4j.Logger;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class);

    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
        LOGGER.error(runnable.toString() + " execution rejected.");
    }
}
person Hussein Akar    schedule 23.05.2020
comment
Имеет смысл. Я не видел RejectedExecutionHandler . - person Yan Khonski; 23.05.2020
comment
Хотя есть один недостаток. В моем случае, если задача отклоняется (это происходит с небольшим числом queueSize, исключение не генерируется, поэтому в журналах нет предупреждений или ошибок). Как клиент узнает, что его задача была отклонена? - person Yan Khonski; 23.05.2020
comment
Обновил мой ответ, чтобы вы могли обрабатывать отклоненные потоки. Обратите внимание, что при расчете высоты threadPoolSize должно быть максимально = доступным процессорам, иначе процессор случайным образом начнет обрабатывать часть потока 1.. остановите его... начните с потока 5.. ... остановите его, затем вернитесь к потоку 1 и продолжите выполнение ... (этот сценарий, если 4 процессора) то же самое применяется для меньшего или большего количества доступных процессоров в процессоре. с другой стороны, если ваш поток вызывает БД или внешний API, нет проблем с созданием 100 в параллельном режиме, пусть служба выполнения обрабатывает то, как они работают - person Hussein Akar; 23.05.2020

Спасибо Хусейну и также этот ответ и документация. Получилось так.

Вы можете проверить полные источники.

    private final ExecutorService executorService = initializeThreadPoolWithRejection();

    private ExecutorService initializeThreadPoolWithRejection() {
        final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

        return new ThreadPoolExecutor(WORKERS_NUMBER, MAX_WORKERS_NUMBER,
                0L, TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(10 /*queueSize*/),
                handler);
    }

Обратите внимание, я использую ThreadPoolExecutor.AbortPolicy();, потому что по умолчанию он терпит неудачу с исключением ExecutionException.

Брокер вычислений

    public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) 
    {
        final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName());
        if (calculationResultCached != null) {
            return CompletableFuture.completedFuture(calculationResultCached);
        }

        LOGGER.info("Calculation submitted: {}.", calculationTask.getName());

        try {
            final CompletableFuture<CalculationResult> calculated = CompletableFuture
                    .supplyAsync(calculationTask::calculate, executorService);
            calculated.thenAccept(this::updateCache);
            return calculated;
        } catch (Exception e) {
            System.out.println("Failed to submit a task.");
            return CompletableFuture.failedFuture(e);
        }
    }

Пример использования в Main:

    private static void completeFuture(final CompletableFuture<CalculationResult> future) {
        final CalculationResult calculationResult;
        try {
            calculationResult = future.get();
            System.out.println("Task is finished: " + calculationResult);
        } catch (InterruptedException e) {
            System.out.println("Task was interrupted. " + e.getMessage());
        } catch (ExecutionException e) {
            System.out.println("Task failed.");
        }
    }

Он производит вывод:

2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_15.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_16.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_17.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_20.
Failed to submit a task.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_21.
Failed to submit a task.
2020-05-23 16:44:09 [main] INFO  c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_22.
Failed to submit a task.
2020-05-23 16:44:11 [pool-1-thread-8] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_17', text='This is result: 1096770940', number=1096770940, durationMs=1246}
2020-05-23 16:44:11 [pool-1-thread-4] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_3', text='This is result: 2103177010', number=2103177010, durationMs=1814}
2020-05-23 16:44:12 [pool-1-thread-6] INFO  c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_15', text='This is result: 961885863', number=961885863, durationMs=2632}
2
Task is finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 79356259', number=79356259, durationMs=3875}
Task is finished: CalculationResult{taskName='CalculationTask_1', text='This is result: 532289460', number=532289460, durationMs=3725}
Task is finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1579151336', number=1579151336, durationMs=3684}
Task failed.
Task failed.
Task failed.

Обратите внимание, это работает только в Java 9+.

CompletableFuture.failedFuture(e); не работает в Java 8.

person Yan Khonski    schedule 23.05.2020