Производитель-Потребитель с ExecutorService.newFixedThreadPool - Сколько потоков создано?

public class MainClass {

private static final int producerPoolSize = 10;
private static final int consumerPoolSize = 20;    

private ExecutorService prodExec = Executors.newFixedThreadPool(producerPoolSize);
private ExecutorService consExec = Executors.newFixedThreadPool(consumerPoolSize);

//main method here, which calls start() below

private void start(String[] args) {

    // Get list of ids, split them in to n(producerPoolSize) chunks

    for (int index = 0; index < producerPoolSize; index++) {
        Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
        prodExec.execute(producer);
    }

}

public class Producer implements Runnable {

    private ExecutorService consExec;
    private List<Long> list;

    public Producer(ExecutorService exec, List<Long> list) {
        this.consExec = exec;
        this.list = list;
    }

    public void run() {

       for (Long id: list) {
          data = get data from db for the id
          consExec.execute(new Consumer(data));
       }
    }
 }

 public class Consumer implements Runnable {

     public void run() {
        // call web service
     }
 }

В приведенном выше коде у меня есть два пула потоков - по одному для производителей и потребителей. Я получаю несколько идентификаторов из базы данных, разделяю их на равные части, чтобы они были переданы потокам-производителю для обработки. Поток-производитель получает список идентификаторов и обрабатывает каждый последовательно, извлекая данные для каждого из идентификаторов и отправляя эти данные в поток-получатель для обработки. Теперь у меня такой вопрос:

Я создал 10 потоков производителей выше. И я хочу, чтобы размер пула потоков Consumer был равен 20. Но при обработке каждого идентификатора источник создает новый Runnable (Consumer) и отправляет (выполняет) его службе исполнителя Consumer. Насколько я понимаю, ExecutorService заключается в том, что Runnable, который вы ему отправляете, оборачивается в рабочий поток, а затем выполняется. Итак, в приведенном выше коде, если количество идентификаторов, которые получает каждый производитель, равно 50, действительно ли я создаю 50 * 10 = 500 потребительских потоков? Слишком много?

Или размер пула на самом деле означает количество рабочих потоков? Итак, в приведенном выше коде я создаю 500 задач для исполнителя Consumer, которые фактически будут поставлены в очередь и выполнены 20 рабочими потоками? Возможно, я не объясняю это правильно, но здесь немного запутался внутренней реализацией исполнителя и беспокоился, если я создаю слишком много потоков Consumer.

Если это не способ реализовать это, может ли кто-нибудь предложить лучший подход? Спасибо.


person Oxford    schedule 19.08.2011    source источник


Ответы (2)


Действительно ли размер пула означает количество рабочих потоков? да.

Если процесс Runnable потребителя занимает много времени, то одновременно будет запущено только 20. Остальные будут ждать в коллекции, пока не станет доступен поток для ее запуска.

Что касается того, есть ли способ сделать это лучше. Есть ли причина, по которой вам нужно использовать потоки? Если у вас нет 20 доступных процессоров, работающих параллельно, это может не увеличить время обработки, потому что все потоки будут проводить время в переключениях контекста и т. Д., Которые бесполезны для обработки данных.

Кроме того, производители получают все данные и хранят их в Потребителях. Если потребители не могут работать, потому что у вас их 500 и только 20 могут работать одновременно, вы сохраняете (500 минус 20) * данные, которые вы можете обработать. Вы можете заставить потребителей получать свои собственные данные.

В ответ на комментарий:

вместо

for (int index = 0; index < producerPoolSize; index++) {
    Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
    prodExec.execute(producer);
}

и процессор

for (Long id: list) {
    data = get data from db for the id
    consExec.execute(new Consumer(data));
}

Потребитель выглядит так:

public class Consumer implements Runnable {

     long myId;

     Consumer(long id){
       myId = id;
     }

     public void run() {
        data = get data from db for the id
        // do whatever a consumer does with data
     }
 }

и

private void start(String[] args) {

    // Get list of ids create a new consumer for each id

    for (int index = 0; index < everyID.length; index++) {
        consExec.execute(new Consumer(everyID[i]));
    }

}

Тогда вы теряете целый класс, и пул 20 имеет больше смысла, потому что потребители, которые заблокированы при получении данных ввода-вывода, будут ждать, а те, которые готовы, могут продолжить обработку.

person Clint    schedule 19.08.2011
comment
Вы можете заставить потребителей получать свои собственные данные. - Вы имеете в виду промежуточную очередь? - person Oxford; 19.08.2011
comment
Привет, Клинт, я реализовал то же самое, что и вы, но угадайте, что выполнение одного потока занимает меньше времени, чем запуск с ExecutorService со счетчиком 10. Можете ли вы помочь мне узнать, что мне не хватает? - person Suryaprakash Pisay; 05.03.2018
comment
Некоторые ситуации лучше подходят для однопоточного выполнения! - person Clint; 26.04.2018

Размер пула определяет количество рабочих потоков. Если вы попытаетесь отправить элемент, когда все рабочие потоки заняты, он будет поставлен в очередь ExecutorService и запустится, как только рабочий станет свободным.

javadocs скажи это:

Создает пул потоков, который повторно использует фиксированный набор потоков, работающих с общей неограниченной очередью. Если какой-либо поток завершается из-за сбоя во время выполнения перед завершением работы, новый поток займет его место, если это необходимо для выполнения последующих задач.

Обратите внимание на подсвеченные части. Количество потоков фиксированное, а очередь неограниченная, то есть элементы, отправленные, когда потоки заняты, всегда будут помещаться в очередь, а не отклоняться.

person dlev    schedule 19.08.2011
comment
Спасибо. Итак, в моем коде выше 500 задач будут поставлены в очередь и выполнены 20 рабочими потоками? Если да, то влияет ли создание 500 Runnables, как в приведенном выше примере, на приложение? Я не создаю слишком много Runnables? - person Oxford; 19.08.2011
comment
Правильно, задачи будут поставлены в очередь и будут использовать только 20 потоков. Создание 500 Runnable не должно быть проблемой; они довольно легкие и, скорее всего, быстро умрут. Решение мне кажется подходящим. - person dlev; 19.08.2011
comment
За исключением того, что 500 потребителей хранят все данные, которые они должны обработать. - person Clint; 19.08.2011
comment
@Clint Справедливый пункт. Обычный шаблон состоит в том, чтобы потребители извлекали данные из очереди производителя. - person dlev; 19.08.2011