Как мы узнаем, что threadPoolExecutor завершил выполнение

У меня есть родительский поток, который отправляет сообщения в MQ, и он управляет ThreadPoolExecutor для рабочих потоков, которые слушают MQ и записывают сообщение в выходной файл. Я управляю пулом потоков размером 5. Поэтому, когда я запускаю свою программу, у меня есть 5 файлов с сообщениями. Пока все работает нормально. Теперь мне нужно объединить эти 5 файлов в моем родительском потоке.

Как мне узнать, что ThreadPoolExecutor завершил обработку, и я могу начать объединение файлов.

public class ParentThread {
    private MessageSender messageSender;
    private MessageReciever messageReciever;
    private Queue jmsQueue;
    private Queue jmsReplyQueue;
    ExecutorService exec = Executors.newFixedThreadPool(5);

    public void sendMessages() {
        System.out.println("Sending");
        File xmlFile = new File("c:/filename.txt");
        List<String> lines = null;
        try {
            lines = FileUtils.readLines(xmlFile, null);
        } catch (IOException e) {
            e.printStackTrace();
        }
        for (String line : lines){
            messageSender.sendMessage(line, this.jmsQueue, this.jmsReplyQueue);
        }
        int count = 0;
        while (count < 5) {
            messageSender.sendMessage("STOP", this.jmsQueue, this.jmsReplyQueue);
            count++;
        }

    }

    public void listenMessages() {
        long finishDate = new Date().getTime();
        for (int i = 0; i < 5; i++) {
            Worker worker = new Worker(i, this.messageReciever, this.jmsReplyQueue);
            exec.execute(worker);
        }
        exec.shutdown();

        if(exec.isTerminated()){ //PROBLEM is HERE. Control Never gets here.
            long currenttime = new Date().getTime() - finishDate;
            System.out.println("time taken: "+currenttime);
            mergeFiles();
        }
    }
}

Это мой рабочий класс

public class Worker implements Runnable {

    private boolean stop = false;
    private MessageReciever messageReciever;
    private Queue jmsReplyQueue;
    private int processId;
    private int count = 0;

    private String message;
    private File outputFile;
    private FileWriter outputFileWriter;

    public Worker(int processId, MessageReciever messageReciever,
            Queue jmsReplyQueue) {
        this.processId = processId;
        this.messageReciever = messageReciever;
        this.jmsReplyQueue = jmsReplyQueue;
    }

    public void run() {
        openOutputFile();
        listenMessages();
    }


    private void listenMessages() {
        while (!stop) {
            String message = messageReciever.receiveMessage(null,this.jmsReplyQueue);
            count++;
            String s = "message: " + message + " Recieved by: "
                    + processId + " Total recieved: " + count;
            System.out.println(s);
            writeOutputFile(s);
            if (StringUtils.isNotEmpty(message) && message.equals("STOP")) {
                stop = true;
            }
        }
    }

    private void openOutputFile() {
        try {
            outputFile = new File("C:/mahi/Test", "file." + processId);
            outputFileWriter = new FileWriter(outputFile);
        } catch (IOException e) {
            System.out.println("Exception while opening file");
            stop = true;
        }
    }

    private void writeOutputFile(String message) {
        try {
            outputFileWriter.write(message);
            outputFileWriter.flush();
        } catch (IOException e) {
            System.out.println("Exception while writing to file");
            stop = true;
        }
    }
}

Как я узнаю, что ThreadPool завершит обработку, чтобы я мог выполнить другую работу по очистке?

Спасибо


person Mahi G    schedule 07.08.2013    source источник


Ответы (3)


Что-то вроде этого:

    exec.shutdown();

    // waiting for executors to finish their jobs
    while (!exec.awaitTermination(50, TimeUnit.MILLISECONDS));

    // perform clean up work
person Ivan Babanin    schedule 07.08.2013
comment
Спасибо SilverHaze. Именно то, что я искал. - person Mahi G; 09.08.2013

Если ваш класс Worker реализует Callable вместо Runnable, то вы сможете увидеть, когда ваши потоки завершатся, используя объект Future, чтобы увидеть, вернул ли поток какой-либо результат (например, логическое значение, которое сообщит вам, завершил ли он выполнение или нет ).

Загляните в раздел «8. Futures and Callables» на веб-сайте ниже, там есть именно то, что вам нужно, imo: http://www.vogella.com/articles/JavaConcurrency/article.html

Изменить: Итак, после того, как все фьючерсы указывают, что выполнение их соответствующего вызываемого объекта завершено, можно с уверенностью предположить, что ваш исполнитель завершил выполнение и может быть отключен / завершен вручную.

person Evgheni Crujcov    schedule 07.08.2013

Вы можете использовать поток для мониторинга ThreadPoolExecutor, подобного этому

import java.util.concurrent.ThreadPoolExecutor;

public class MyMonitorThread implements Runnable {
     private ThreadPoolExecutor executor;

        private int seconds;

        private boolean run=true;

        public MyMonitorThread(ThreadPoolExecutor executor, int delay)
        {
            this.executor = executor;
            this.seconds=delay;
        }

        public void shutdown(){
            this.run=false;
        }

        @Override
        public void run()
        {
            while(run){
                    System.out.println(
                        String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                            this.executor.getPoolSize(),
                            this.executor.getCorePoolSize(),
                            this.executor.getActiveCount(),
                            this.executor.getCompletedTaskCount(),
                            this.executor.getTaskCount(),
                            this.executor.isShutdown(),
                            this.executor.isTerminated()));
                    try {
                        Thread.sleep(seconds*1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            }

        }

}

И добавить

MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
            Thread monitorThread = new Thread(monitor);
            monitorThread.start(); 

в ваш класс, в котором расположен ThreadPoolExecutor.

Он будет показывать состояние ваших нитевидных переключателей каждые 3 секунды.

person neverwinter    schedule 04.12.2013