Подождите, пока потоки демона завершат итерацию с использованием службы-исполнителя.

Мне нужно распараллелить существующую фоновую задачу таким образом, чтобы вместо последовательного потребления ресурсов «x» она параллельно завершала работу, используя только потоки «y» (y ‹‹ x). Эта задача постоянно работает в фоновом режиме и продолжает обрабатывать некоторые ресурсы.

Код структурирован следующим образом:

class BaseBackground implements Runnable {
    @Override
    public void run() {
        int[] resources = findResources(...);

        for (int resource : resources) {
            processResource(resource);
        }

        stopProcessing();
     }

    public abstract void processResource(final int resource);
    public void void stopProcessing() {
         // Override by subclass as needed
    }
}

class ChildBackground extends BaseBackground {

    @Override
    public abstract void processResource(final int resource) {
        // does some work here
    }

    public void void stopProcessing() {
        // reset some counts and emit metrics
    }
}

Я изменил ChildBackground следующим образом:

class ChildBackground extends BaseBackground {

    private final BlockingQueue<Integer> resourcesToBeProcessed;

    public ChildBackground() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; ++i) {
             executorService.submit(new ResourceProcessor());
        }
    }

    @Override
    public abstract void processResource(final int resource) {
        resourcesToBeProcessed.add(resource);
    }

    public void void stopProcessing() {
        // reset some counts and emit metrics
    }

    public class ResourceProcessor implements Runnable {
        @Override
        public void run() {
            while (true) {
                int nextResource = resourcesToBeProcessed.take();
                // does some work
            }
        }
    }
}

Я не создаю и не удаляю ExecutorService каждый раз, потому что сборка мусора — это небольшая проблема в моем сервисе. Хотя я не понимаю, насколько это будет плохо, так как я не буду создавать более 10 потоков на каждой итерации.

Я не могу понять, как мне дождаться, пока все ResourceProcessor закончат обработку ресурсов для одной итерации, чтобы я мог сбросить некоторые счетчики и выдать метрики в stopProcessing. Я рассматривал следующие варианты:

1) executorService.awaitTermination (тайм-аут). На самом деле это не сработает, так как всегда будет блокироваться до истечения времени ожидания, потому что потоки ResourceProcessor никогда не закончат свою работу.

2) Я могу узнать количество ресурсов после findResources и сделать его доступным для дочернего класса, и каждый ResourceProcessor будет увеличивать количество обрабатываемых ресурсов. Мне придется подождать, пока все ресурсы будут обработаны в stopProcessing, прежде чем сбрасывать счетчики. Мне нужно что-то вроде CountDownLatch, но вместо этого он должен считать UP. В этом варианте будет много управления состоянием, что мне не особо нравится.

3) Я мог бы обновить public abstract void processResource(final int resource), чтобы включить подсчет общих ресурсов и заставить дочерний процесс ждать, пока все потоки не обработают общие ресурсы. В этом случае также будет некоторое управление состоянием, но оно будет ограничено дочерним классом.

В любом из двух случаев мне придется добавить логику wait() и notify(), но я не уверен в своем подходе. Вот что у меня есть:

class ChildBackground extends BaseBackground {

    private static final int UNSET_TOTAL_RESOURCES = -1;

    private final BlockingQueue<Integer> resourcesToBeProcessed;

    private int totalResources = UNSET_TOTAL_RESOURCES;
    private final AtomicInteger resourcesProcessed = new AtomicInteger(0);

    public ChildBackground() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; ++i) {
             executorService.submit(new ResourceProcessor());
        }
    }

    @Override
    public abstract void processResource(final int resource, final int totalResources) {
        if (this.totalResources == UNSET_TOTAL_RESOURCES) {
            this.totalResources = totalResources;
        } else {
            Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
        }
        resourcesToBeProcessed.add(resource);
    }

    public void void stopProcessing() {
        try {
            waitForAllResourcesToBeProcessed();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        resourcesProcessed.set(0);
        totalResources = UNSET_TOTAL_RESOURCES;
        // reset some counts and emit metrics
    }

    private void incrementProcessedResources() {
        synchronized (resourcesProcessed) {
            resourcesProcessed.getAndIncrement();
            resourcesProcessed.notify();
        }
    }

    private void waitForAllResourcesToBeProcessed() throws InterruptedException {
        synchronized (resourcesProcessed) {
             while (resourcesProcessed.get() != totalResources) {
                resourcesProcessed.wait();
             }
        }
    }

    public class ResourceProcessor implements Runnable {
        @Override
        public void run() {
            while (true) {
                int nextResource = resourcesToBeProcessed.take();
                try {
                   // does some work
                } finally {
                   incrementProcessedResources();
                }
            }
        }
    }
}

Я не уверен, что использование AtomicInteger является правильным способом сделать это, и если да, то нужно ли мне вызывать wait() и notify(). Если я не использую wait() и notify(), мне даже не нужно выполнять все в синхронизированном блоке.

Пожалуйста, дайте мне знать, что вы думаете об этом подходе, если я должен просто создавать и выключать ExecutorService для каждой итерации или есть четвертый подход, который я должен использовать.


person user1071840    schedule 12.09.2016    source источник
comment
Взгляните на Callables and Futures на этой странице winterbe .com/posts/2015/04/07/   -  person Scary Wombat    schedule 12.09.2016
comment
Фьючерсы не подходят, потому что потоки исполнителя работают в тесном цикле, который останавливается только тогда, когда служба деактивируется.   -  person user1071840    schedule 12.09.2016
comment
Быть уведомленным, когда задание, данное исполнителю, выполнено, является целью мысли о будущем. Конечно, вы не можете использовать, например, docs.oracle.com/javase/8/docs/api/java/util/concurrent/ ? Вы отправляете один пакет ресурсов исполнителям (накапливая 1 будущее за ресурсом), создаете будущее, которое ожидает их всех вместе, и заставляете последнее вычислять ваши показатели. После представления этого последнего будущего вы можете забыть обо всех них. В реактивном стиле.   -  person GPI    schedule 12.09.2016


Ответы (1)


Ваш код кажется излишне сложным. Зачем иметь собственную очередь, если внутри ExecutorService уже есть очередь? Вам приходится заниматься администрированием, когда я думаю, что вы можете позволить акции ExecutorService справиться с этим за вас.

Я бы определил вашу работу как:

public static class ResourceProcessor implements Runnable {
   private final int resource;
   public ResourceProcessor(int resource) {
      this.resource = resource;
   }
   public void run() {
      try {
         // does some work
      } finally {
         // if this is still necessary then you should use a `Future` instead
         incrementProcessedResources();
      }
   }
}

Затем вы можете отправить их следующим образом:

ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < totalResources; ++i) {
     executorService.submit(new ResourceProcessor(i));
}
// shutdown the thread pool after the last submit
executorService.shutdown();

executorService.awaitTermination(timeout). На самом деле это не сработает, так как всегда будет блокироваться до истечения времени ожидания, потому что потоки ResourceProcessor никогда не закончат свою работу.

Теперь это сработает.

2) Я могу узнать количество ресурсов [завершено].

Вам это все еще нужно, если вы можете позвонить awaitTermination(...)?

3) Я мог бы обновить общедоступный абстрактный void processResource (final int resource), чтобы включить подсчет общих ресурсов и заставить дочерний процесс ждать, пока все потоки не обработают общие ресурсы...

Тот же вопрос. Это необходимо?

Если вам действительно нужно знать список обработанных запросов, вы можете, как упоминал @ScaryWombat, использовать Future<Integer> и Callable<Integer> или использовать ExecutorCompletionService.

Фьючерсы не подходят, потому что потоки исполнителя работают в тесном цикле, который останавливается только тогда, когда служба деактивируется.

Можете ли вы объяснить это подробнее?

Надеюсь это поможет.

person Gray    schedule 14.09.2016
comment
О боже, я просто был глуп. Я совершенно забыл, что могу отправить в службу-исполнитель больше задач (потоков), чем размер пула потоков, спасибо, что напомнили мне об этом. - person user1071840; 14.09.2016
comment
Чтобы быть точным @ user1071840, задания - это просто Runnables, а не потоки, так что правильно, вы можете добавить 100 тыс. заданий в свой пул потоков, который обрабатывает их только с двумя потоками. - person Gray; 14.09.2016