Тестирование PriorityBlockingQueue в ThreadPoolExecutor

Я реализовал свой ThreadPoolExecutor с помощью PriorityBlockingQueue, как в этом примере: https://stackoverflow.com/a/12722648/2206775

и написал тест:

PriorityExecutor executorService = (PriorityExecutor)  PriorityExecutor.newFixedThreadPool(16);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 1);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 3);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 2);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("5");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 5);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("4");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 4);

    executorService.shutdown();
    try {
        executorService.awaitTermination(30, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

Но в итоге я не получаю 1 2 3 4 5, я получаю эти числа в случайном порядке. Есть проблема с тестом или еще что? И если первое, как это можно правильно протестировать?


person Dmitry Sobetsky    schedule 30.05.2013    source источник


Ответы (2)


Приоритет учитывается только в том случае, если пул полностью занят и вы отправляете несколько новых задач. Если вы определяете свой пул только с одним потоком, вы должны получить ожидаемый результат. В вашем примере все задачи выполняются одновременно, а какая из них завершается первой, несколько случайна.

Между прочим, связанная реализация имеет проблему и выдает исключение, если ваша очередь заполнена и вы отправляете новые задачи.

См. Ниже рабочий пример того, чего вы пытаетесь достичь (я переопределил newTaskFor в упрощенном виде, просто для того, чтобы это работало - вы можете улучшить эту часть).

Он печатает: 1 2 3 4 5.

public class Test {

    public static void main(String[] args) {
        PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(1);
        executorService.submit(getRunnable("1"), 1);
        executorService.submit(getRunnable("3"), 3);
        executorService.submit(getRunnable("2"), 2);
        executorService.submit(getRunnable("5"), 5);
        executorService.submit(getRunnable("4"), 4);

        executorService.shutdown();
        try {
            executorService.awaitTermination(30, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static Runnable getRunnable(final String id) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(id);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    }

    static class PriorityExecutor extends ThreadPoolExecutor {

        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                                long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        //Utitlity method to create thread pool easily

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new PriorityExecutor(nThreads, nThreads, 0L,
                                        TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
        }
        //Submit with New comparable task

        public Future<?> submit(Runnable task, int priority) {
            return super.submit(new ComparableFutureTask(task, null, priority));
        }
        //execute with New comparable task

        public void execute(Runnable command, int priority) {
            super.execute(new ComparableFutureTask(command, null, priority));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return (RunnableFuture<T>) callable;
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return (RunnableFuture<T>) runnable;
        }
    }

    static class ComparableFutureTask<T> extends FutureTask<T> implements Comparable<ComparableFutureTask<T>> {

        volatile int priority = 0;

        public ComparableFutureTask(Runnable runnable, T result, int priority) {
            super(runnable, result);
            this.priority = priority;
        }

        public ComparableFutureTask(Callable<T> callable, int priority) {
            super(callable);
            this.priority = priority;
        }

        @Override
        public int compareTo(ComparableFutureTask<T> o) {
            return Integer.valueOf(priority).compareTo(o.priority);
        }
    }
}
person assylias    schedule 30.05.2013
comment
Не могли бы вы объяснить, почему связанная реализация выдает это исключение, если мы не переопределяем оба метода newTaskFor? - person Dmitry Sobetsky; 31.05.2013
comment
Поскольку реализация по умолчанию превращает ваш ComparableFutureTask в RunnableFuture, и когда ваш PriorityQueue пытается вернуть RunnableFuture обратно в ComparableFutureTask, он получает исключение. JDK поставляется с исходным кодом, поэтому вы можете пошагово отладить его в своей среде IDE, чтобы увидеть, как это происходит вживую. - person assylias; 31.05.2013
comment
Сегодня я потратил два часа на анализ ... Если вы на 100% уверены, что не позвоните ThreadPoolExecutor#submit (только ThreadPoolExecutor#execute), вы можете пропустить весь PriorityExecutor. ThreadPoolExecutor#execute не будет заключать параметр Runnable во что-либо, что сделает его сопоставимым в PriorityBlockingQueue. В зависимости от области применения вашего ThreadPoolExecutor это может быть жизнеспособным решением. Я бы порекомендовал добавить комментарий по этому поводу ... И последнее, но не менее важное: AbstractExecutorService ужасно предназначен для этого! - person Ztyx; 25.04.2015
comment
Я пробовал этот код, он всегда запускает первую задачу без учета приоритета. Например, если вы сначала отправляете задачу с приоритетом 4, она всегда будет запускать эту задачу в начале. Кроме того, если вы увеличите размер пула потоков до 2, даже если имеется достаточно ожидающих потоков, он будет запускать их в соответствии с порядком вставки. - person Arash; 24.11.2015
comment
@Arash Это ожидается: когда отправляется первая задача, очередь задач пуста, и эта задача выполняется немедленно, потому что это единственная задача, известная пулу. - person assylias; 24.11.2015
comment
Если вы используете тот же приоритет, начальный порядок подачи не принимается во внимание! - person Daniel Hári; 15.03.2017
comment
Я сделал улучшение, чтобы сохранить порядок задач для тех же приоритетов: stackoverflow.com/a/42831172/1386911 - person Daniel Hári; 16.03.2017

У вас 16 потоков и только 5 задач, то есть все они выполняются одновременно, и приоритет на самом деле не имеет значения.

Приоритет имеет значение только тогда, когда есть задачи, ожидающие выполнения.

Чтобы показать это, если вы установите в своем примере только 1 поток, вы получите ожидаемый результат.

person Supericy    schedule 30.05.2013