Я работаю над своим первым многопоточным проектом, и поэтому у меня есть пара вещей, в которых я не уверен. Подробная информация о моей настройке была в предыдущем вопросе, вкратце: у меня есть пул потоков, реализованный Executors.newFixedThreadPool(N)
. Одному потоку дается действие, которое выполняет серию запросов к локальным и удаленным ресурсам и итеративно заполняет ArrayBlockingQueue
, в то время как остальные потоки вызывают метод take()
в очереди и обрабатывают объекты в очереди.
Несмотря на то, что небольшие и контролируемые тесты, похоже, проходят нормально, я не знаю, как мне справиться со специальными сценариями, такими как начало (в очереди еще нет элементов), конец (очередь пуста) и любые возможные InterruptedExceptions
. Я немного почитал здесь о SO, что привело меня к двум действительно хорошим статьям Гетц и Кабуц. Похоже, консенсус заключается в том, что не следует игнорировать эти исключения. Однако я не уверен, как представленные примеры относятся к моей ситуации, я нигде не вызывал thread.interrupt()
в своем коде... Говоря об этом, я не уверен, должен ли я был это сделать...
Подводя итог, учитывая приведенный ниже код, как лучше всего обрабатывать особые случаи, такие как критерии завершения и InterrrruptedExceptions? Надеюсь, что вопросы имеют смысл, иначе я сделаю все возможное, чтобы описать это дальше.
Заранее спасибо,
изменить: я уже некоторое время работаю над реализацией и столкнулся с новой заминкой, поэтому решил обновить ситуацию. Мне не повезло столкнуться с ConcurrentModificationException
, что, скорее всего, произошло из-за неполного отключения/прекращения пула потоков. Как только я понял, что могу использовать isTerminated()
, я попробовал это, потом получил IllegalMonitorStateException
из-за несинхронизированного wait()
. Текущее состояние кода показано ниже:
Я последовал некоторым советам из ответа @Jonathan, однако я не думаю, что его предложение работает совсем так, как мне нужно/хочу. Предыстория такая же, как я упомянул выше, и соответствующие фрагменты кода таковы:
Класс, владеющий/управляющий пулом, и отправка исполняемых файлов:
public void serve() {
try {
this.started = true;
pool.execute(new QueryingAction(pcqs));
for(;;){
PathwayImpl p = bq.take();
if (p.getId().equals("0")){
System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
pool.shutdown();
// give 3 minutes per item in queue to finish up
pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
break;
}
int sortMethod = AnalysisParameters.getInstance().getSort_method();
pool.submit(new AnalysisAction(p));
}
} catch (Exception ex) {
ex.printStackTrace();
System.err.println("Unexpected error in core analysis, terminating execution!");
System.exit(0);
}finally{ pool.shutdown(); }
}
public boolean isDone(){
if(this.started)
return pool.isTerminated();
else
return false;
}
Элементы добавляются в очередь следующим кодом, расположенным в отдельном классе:
this.queue.offer(path, offer_wait, TimeUnit.MINUTES);
... мотивация offer()
вместо take()
, как упоминал Джонатан. Непредвиденные блоки раздражают и их трудно понять, так как мой анализ занимает много времени. Поэтому мне нужно относительно быстро узнать, происходит ли сбой из-за плохого блока или это просто перебор чисел...
и наконец; вот код в моем тестовом классе, где я проверяю взаимодействие между «службой параллелизма» (названной здесь cs) и остальными анализируемыми объектами:
cs.serve();
synchronized (this) {
while(!cs.isDone())
this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();
Я понимаю, что это был ОЧЕНЬ длинный вопрос, но я попытался быть подробным и конкретным. Надеюсь, это не будет слишком затруднительно, и я извиняюсь, если это...