Мне нужно распараллелить существующую фоновую задачу таким образом, чтобы вместо последовательного потребления ресурсов «x» она параллельно завершала работу, используя только потоки «y» (y ‹‹ x). Эта задача постоянно работает в фоновом режиме и продолжает обрабатывать некоторые ресурсы.
Код структурирован следующим образом:
class BaseBackground implements Runnable {
@Override
public void run() {
int[] resources = findResources(...);
for (int resource : resources) {
processResource(resource);
}
stopProcessing();
}
public abstract void processResource(final int resource);
public void void stopProcessing() {
// Override by subclass as needed
}
}
class ChildBackground extends BaseBackground {
@Override
public abstract void processResource(final int resource) {
// does some work here
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
}
Я изменил ChildBackground
следующим образом:
class ChildBackground extends BaseBackground {
private final BlockingQueue<Integer> resourcesToBeProcessed;
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource) {
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
// does some work
}
}
}
}
Я не создаю и не удаляю ExecutorService каждый раз, потому что сборка мусора — это небольшая проблема в моем сервисе. Хотя я не понимаю, насколько это будет плохо, так как я не буду создавать более 10 потоков на каждой итерации.
Я не могу понять, как мне дождаться, пока все ResourceProcessor
закончат обработку ресурсов для одной итерации, чтобы я мог сбросить некоторые счетчики и выдать метрики в stopProcessing
. Я рассматривал следующие варианты:
1) executorService.awaitTermination (тайм-аут). На самом деле это не сработает, так как всегда будет блокироваться до истечения времени ожидания, потому что потоки ResourceProcessor
никогда не закончат свою работу.
2) Я могу узнать количество ресурсов после findResources
и сделать его доступным для дочернего класса, и каждый ResourceProcessor
будет увеличивать количество обрабатываемых ресурсов. Мне придется подождать, пока все ресурсы будут обработаны в stopProcessing
, прежде чем сбрасывать счетчики. Мне нужно что-то вроде CountDownLatch, но вместо этого он должен считать UP
. В этом варианте будет много управления состоянием, что мне не особо нравится.
3) Я мог бы обновить public abstract void processResource(final int resource)
, чтобы включить подсчет общих ресурсов и заставить дочерний процесс ждать, пока все потоки не обработают общие ресурсы. В этом случае также будет некоторое управление состоянием, но оно будет ограничено дочерним классом.
В любом из двух случаев мне придется добавить логику wait() и notify(), но я не уверен в своем подходе. Вот что у меня есть:
class ChildBackground extends BaseBackground {
private static final int UNSET_TOTAL_RESOURCES = -1;
private final BlockingQueue<Integer> resourcesToBeProcessed;
private int totalResources = UNSET_TOTAL_RESOURCES;
private final AtomicInteger resourcesProcessed = new AtomicInteger(0);
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource, final int totalResources) {
if (this.totalResources == UNSET_TOTAL_RESOURCES) {
this.totalResources = totalResources;
} else {
Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
}
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
try {
waitForAllResourcesToBeProcessed();
} catch (InterruptedException e) {
e.printStackTrace();
}
resourcesProcessed.set(0);
totalResources = UNSET_TOTAL_RESOURCES;
// reset some counts and emit metrics
}
private void incrementProcessedResources() {
synchronized (resourcesProcessed) {
resourcesProcessed.getAndIncrement();
resourcesProcessed.notify();
}
}
private void waitForAllResourcesToBeProcessed() throws InterruptedException {
synchronized (resourcesProcessed) {
while (resourcesProcessed.get() != totalResources) {
resourcesProcessed.wait();
}
}
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
try {
// does some work
} finally {
incrementProcessedResources();
}
}
}
}
}
Я не уверен, что использование AtomicInteger
является правильным способом сделать это, и если да, то нужно ли мне вызывать wait() и notify(). Если я не использую wait() и notify(), мне даже не нужно выполнять все в синхронизированном блоке.
Пожалуйста, дайте мне знать, что вы думаете об этом подходе, если я должен просто создавать и выключать ExecutorService для каждой итерации или есть четвертый подход, который я должен использовать.
Callables and Futures
на этой странице winterbe .com/posts/2015/04/07/ - person Scary Wombat   schedule 12.09.2016