Правильная реализация сценария производитель-потребитель и корректное завершение пула потоков

Я работаю над своим первым многопоточным проектом, и поэтому у меня есть пара вещей, в которых я не уверен. Подробная информация о моей настройке была в предыдущем вопросе, вкратце: у меня есть пул потоков, реализованный Executors.newFixedThreadPool(N). Одному потоку дается действие, которое выполняет серию запросов к локальным и удаленным ресурсам и итеративно заполняет ArrayBlockingQueue, в то время как остальные потоки вызывают метод take() в очереди и обрабатывают объекты в очереди.

Несмотря на то, что небольшие и контролируемые тесты, похоже, проходят нормально, я не знаю, как мне справиться со специальными сценариями, такими как начало (в очереди еще нет элементов), конец (очередь пуста) и любые возможные InterruptedExceptions. Я немного почитал здесь о SO, что привело меня к двум действительно хорошим статьям Гетц и Кабуц. Похоже, консенсус заключается в том, что не следует игнорировать эти исключения. Однако я не уверен, как представленные примеры относятся к моей ситуации, я нигде не вызывал thread.interrupt() в своем коде... Говоря об этом, я не уверен, должен ли я был это сделать...

Подводя итог, учитывая приведенный ниже код, как лучше всего обрабатывать особые случаи, такие как критерии завершения и InterrrruptedExceptions? Надеюсь, что вопросы имеют смысл, иначе я сделаю все возможное, чтобы описать это дальше.

Заранее спасибо,


изменить: я уже некоторое время работаю над реализацией и столкнулся с новой заминкой, поэтому решил обновить ситуацию. Мне не повезло столкнуться с ConcurrentModificationException, что, скорее всего, произошло из-за неполного отключения/прекращения пула потоков. Как только я понял, что могу использовать isTerminated(), я попробовал это, потом получил IllegalMonitorStateException из-за несинхронизированного wait(). Текущее состояние кода показано ниже:

Я последовал некоторым советам из ответа @Jonathan, однако я не думаю, что его предложение работает совсем так, как мне нужно/хочу. Предыстория такая же, как я упомянул выше, и соответствующие фрагменты кода таковы:

Класс, владеющий/управляющий пулом, и отправка исполняемых файлов:

public void serve() {
    try {
        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            PathwayImpl p = bq.take();

            if (p.getId().equals("0")){
                System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
                pool.shutdown();
                            // give 3 minutes per item in queue to finish up
                pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
                break;
            }
            int sortMethod = AnalysisParameters.getInstance().getSort_method();
            pool.submit(new AnalysisAction(p)); 
        }
      } catch (Exception ex) {
          ex.printStackTrace();
          System.err.println("Unexpected error in core analysis, terminating execution!");
          System.exit(0);
      }finally{   pool.shutdown();     }
}

public boolean isDone(){
    if(this.started)
        return pool.isTerminated();
    else
        return false;
    }

Элементы добавляются в очередь следующим кодом, расположенным в отдельном классе:

this.queue.offer(path, offer_wait, TimeUnit.MINUTES);

... мотивация offer() вместо take(), как упоминал Джонатан. Непредвиденные блоки раздражают и их трудно понять, так как мой анализ занимает много времени. Поэтому мне нужно относительно быстро узнать, происходит ли сбой из-за плохого блока или это просто перебор чисел...


и наконец; вот код в моем тестовом классе, где я проверяю взаимодействие между «службой параллелизма» (названной здесь cs) и остальными анализируемыми объектами:

cs.serve();
synchronized (this) {
    while(!cs.isDone())
    this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();

Я понимаю, что это был ОЧЕНЬ длинный вопрос, но я попытался быть подробным и конкретным. Надеюсь, это не будет слишком затруднительно, и я извиняюсь, если это...


person posdef    schedule 16.03.2011    source источник


Ответы (1)


Вместо использования take, который блокирует, используйте что-то вроде этого:

PathwayImpl p = null;
synchronized (bq) {
    try {
        while (bq.isEmpty() && !stopSignal) {
            bq.wait(3000); // Wait up to 3 seconds and check again
        }

        if (!stopSignal) {
            p = bq.poll();
        }
    }
    catch (InterruptedException ie) {
        // Broke us out of waiting, loop around to test the stopSignal again
    }
}

Это предполагает, что блок заключен в какой-то while (!stopSignal) {...}.

Затем в коде, который добавляет в очередь, сделайте следующее:

synchronized (bq) {
    bq.add(item);
    bq.notify();
}

Что касается InterruptedExceptions, они хороши для того, чтобы сигнализировать потоку о немедленной проверке стоп-сигнала, вместо того, чтобы ждать следующего тайм-аута и проверки. Я предлагаю просто снова проверить ваш стоп-сигнал и, возможно, зарегистрировать исключение.

Я использую их при сигнале паники, а не при обычном отключении, но такая ситуация требуется редко.

person Jonathan    schedule 16.03.2011
comment
Спасибо за ваш ответ Джонатан. Не могли бы вы уточнить пару моментов? Прежде всего, почему бы не take(), лучше иметь дело с нулями или с исключениями? Во-вторых, я не очень понимаю, почему вы помещаете вещи в отдельный синхронизированный блок, поскольку bq уже должен быть потокобезопасной коллекцией, я ошибаюсь? И, наконец, как вы предлагаете мне реализовать сигнал остановки; например, как логическое значение? - person posdef; 17.03.2011
comment
1) take() заблокируется, требуя, чтобы вы вручную прервали поток, если вы хотите, чтобы он знал, что он должен проверить stopSignal. 2) Синхронизированные блоки позволяют использовать wait() и notify(). 3) stopSignal будет булевой переменной экземпляра, вероятно, переключаемой какой-то функцией public void requestHalt() или подобной. - person Jonathan; 17.03.2011