Как реализовать простую многопоточность с фиксированным количеством рабочих потоков

Я ищу самый простой и простой способ реализовать следующее:

  • Основная программа создает рабочие потоки для выполнения задачи.
  • Одновременно могут выполняться только n задачи.
  • Когда достигается n, рабочие процессы больше не запускаются до тех пор, пока количество запущенных потоков не упадет ниже n.

person shsteimer    schedule 24.09.2008    source источник


Ответы (6)


Я думаю, что Исполнители. newFixedThreadPool соответствует вашим требованиям. Существует несколько различных способов использования полученного ExecutorService, в зависимости от того, хотите ли вы, чтобы результат возвращался в основной поток, или является ли задача полностью автономной, и есть ли у вас набор задач для выполнения заранее, или ставятся ли задачи в очередь в ответ на какое-то событие.

  Collection<YourTask> tasks = new ArrayList<YourTask>();
  YourTask yt1 = new YourTask();
  ...
  tasks.add(yt1);
  ...
  ExecutorService exec = Executors.newFixedThreadPool(5);
  List<Future<YourResultType>> results = exec.invokeAll(tasks);

В качестве альтернативы, если у вас есть новая асинхронная задача, которую нужно выполнить в ответ на какое-то событие, вы, вероятно, просто захотите использовать простой метод execute(Runnable) службы ExecutorService.

person erickson    schedule 24.09.2008


Используйте фреймворк Executor; а именно newFixedThreadPool(N)

person hazzen    schedule 24.09.2008

  1. Если ваша очередь задач не будет неограниченной и задачи могут выполняться за более короткие промежутки времени, вы можете использовать Executors.newFixedThreadPool(n); как предполагают специалисты.

    Единственным недостатком этого решения является неограниченный размер очереди задач. У вас нет контроля над этим. Огромное нагромождение в очереди задач снизит производительность приложения и может привести к нехватке памяти в некоторых сценариях.

  2. #P3# <блочная цитата> #P4# #P5#
  3. Я предпочитаю ThreadPoolExecutor из-за гибкости API для управления многими параметрами, которые контролируют выполнение потоковой задачи.

    ThreadPoolExecutor(int corePoolSize, 
                           int maximumPoolSize, 
                           long keepAliveTime, 
                           TimeUnit unit, 
                           BlockingQueue<Runnable> workQueue, 
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
    

в вашем случае установите оба corePoolSize and maximumPoolSize as N. Здесь вы можете контролировать размер очереди задач, определять свою собственную фабрику потоков и политику обработчика отклонений.

Взгляните на соответствующий вопрос SE, чтобы динамически контролировать размер пула:

Динамический пул потоков

person Ravindra babu    schedule 31.01.2016

Если вы хотите свернуть самостоятельно:

private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);

private boolean roomLeft() {
    synchronized (workers) {
        return (workers.size() < MAX_WORKERS);
    }
}

private void addWorker() {
    synchronized (workers) {
        workers.add(new Worker(this));
    }
}

public void removeWorker(Worker worker) {
    synchronized (workers) {
        workers.remove(worker);
    }
}

public Example() {
    while (true) {
        if (roomLeft()) {
            addWorker();
        } 
    }
}

Где Worker — это ваш класс, расширяющий Thread. Каждый рабочий вызовет метод removeWorker этого класса, передав себя в качестве параметра, когда закончит свою работу.

С учетом сказанного фреймворк Executor выглядит намного лучше.

Редактировать: Кто-нибудь хочет объяснить, почему это так плохо, вместо того, чтобы просто изменить его?

person rjohnston    schedule 24.09.2008

Как уже упоминалось здесь, лучше всего создать пул потоков с помощью Исполнители класс:

Однако, если вы хотите свернуть свой собственный, этот код должен дать вам представление о том, как действовать дальше. По сути, просто добавьте каждый новый поток в группу потоков и убедитесь, что у вас никогда не будет более N активных потоков в группе:

Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
    if( group.activeCount()<N && i<tasks.length ) {
        new TaskThread(group, tasks[i]).start();
        i++;
    } else {
        Thread.sleep(100);
    }
}
person Eli Courtwright    schedule 24.09.2008