реализация PriorityQueue на ThreadPoolExecutor

Боролся с этим уже более 2-х дней.

реализовал ответ, который я видел здесь Указать выполнение порядка задач в Java

public class PriorityExecutor extends ThreadPoolExecutor {

public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//Utitlity method to create thread pool easily
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new PriorityExecutor(nThreads, nThreads, 0L,
            TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
}
//Submit with New comparable task
public Future<?> submit(Runnable task, int priority) {
    return super.submit(new ComparableFutureTask(task, null, priority));
}
//execute with New comparable task
public void execute(Runnable command, int priority) {
    super.execute(new ComparableFutureTask(command, null, priority));
}
}

public class ComparableFutureTask<T> extends FutureTask<T>
    implements
    Comparable<ComparableFutureTask<T>> {

volatile int priority = 0;

public ComparableFutureTask(Runnable runnable, T result, int priority) {
    super(runnable, result);
    this.priority = priority;
}
public ComparableFutureTask(Callable<T> callable, int priority) {
    super(callable);
    this.priority = priority;
}

@Override
public int compareTo(ComparableFutureTask<T> o) {
    return Integer.valueOf(priority).compareTo(o.priority);
}
}

Используемый мной Runnable: MyTask

public class MyTask implements Runnable{

 public MyTask(File file, Context context, int requestId) {
    this._file = file;
    this.context = context;
    this.requestId = requestId;
}

@Override
public void run() {
      // some work
    } catch (IOException e) {
        Log.e("Callable try", post.toString());

    }
}

Мой сервис: MediaDownloadService

public class MediaDownloadService extends Service {

private DBHelper helper;
Notification notification;
HashMap<Integer,Future> futureTasks = new HashMap<Integer, Future>();
final int _notificationId=1;
File file;

@Override
public IBinder onBind(Intent intent) {
    return sharonsBinder;
}


@Override
public int onStartCommand(Intent intent, int flags, int startId) {
    helper = new DBHelper(getApplicationContext());
    PriorityExecutor executor = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(3);
    Log.e("requestsExists", helper.requestsExists() + "");
   if(helper.requestsExists()){
        // map of the index of the request and the string of the absolute path of the request
        Map<Integer,String> requestMap = helper.getRequestsToExcute(0);
        Set<Integer> keySet = requestMap.keySet();
        Iterator<Integer> iterator = keySet.iterator();
        Log.e("MAP",requestMap.toString());
        //checks if the DB requests exists
        if(!requestMap.isEmpty()){
            //execute them and delete the DB entry
            while(iterator.hasNext()){
                int iteratorNext = iterator.next();
                Log.e("ITREATOR", iteratorNext + "");
                file = new File(requestMap.get(iteratorNext));
                Log.e("file", file.toString());
                Log.e("thread Opened", "Thread" + iteratorNext);
                Future future = executor.submit(new MyTask(file, this, iteratorNext),10);
                futureTasks.put(iteratorNext, future);
                helper.requestTaken(iteratorNext);
            }
            Log.e("The priority queue",executor.getQueue().toString());
        }else{

            Log.e("stopself", "stop self after this");
            this.stopSelf();
        }
    }
    return START_STICKY;
}

продолжайте получать ошибку в этой строке: Future future = executor.submit (new MyTask (file, this, iteratorNext), 10);

Даже если executor.submit(); предположим, чтобы вернуть объект будущего, который я продолжаю получать

Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
        at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:318)
        at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:450)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1331)
        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:81)
        at com.vit.infibond.test.PriorityExecutor.submit(PriorityExecutor.java:26)
        at com.vit.infibond.test.MediaDownloadService.onStartCommand(MediaDownloadService.java:65)

Может ли кто-нибудь спасти меня от этого кошмара?

Я попытался сделать так, как предлагает этот ответ, а также

добавив переопределение forNewTask только для того, чтобы снова получить кастинг, но на этот раз для RunnableFuture.

Я понимаю, что в моем понимании чего-то не хватает, и был бы признателен за подробное объяснение...


person sharon gur    schedule 01.06.2015    source источник
comment
Спасибо, Шарон, я расширил ваши комментарии ниже. довольно животрепещущая тема   -  person V H    schedule 23.09.2016


Ответы (2)


Глядя на исходный код для java.util.concurrent.ThreadPoolExecutor, кажется, что заставить его работать при отправке фьючерсов очень сложно. Вы должны переопределить защищенные методы, которые кажутся внутренними, и делать некоторые неприятные приведения.

Я предлагаю вместо этого просто использовать метод execute. Там нет переноса Runnable, поэтому он должен работать.

Если вам нужно дождаться результатов своей работы, я предлагаю реализовать это самостоятельно, чтобы не возиться с внутренними компонентами ThreadPoolExecutor.

person K Erlandsson    schedule 01.06.2015
comment
Моя проблема не в том, чтобы дождаться завершения моих заданий, но я хочу управлять ими, пока они находятся в очереди. Например, у меня есть элементы в очереди, и мне нужно прямо сейчас улучшить один из их приоритетов, поэтому мне нужна ссылка на этот будущий объект, чтобы я мог отменить эту задачу и повторно вставить ее с более высоким приоритетом. И буду признателен, если объясните немного больше о его реализации самостоятельно, вы имеете в виду создать пул потоков самостоятельно? - person sharon gur; 01.06.2015
comment
Чтобы изменить приоритет, вам нужен доступ к ComparableFutureTask, так как это то, что находится в очереди, а не Future. Следовательно, для этой цели вы все еще можете использовать executor.execute() вместо submit(). - person K Erlandsson; 01.06.2015
comment
executor.execute() вернет ComparableFutureTask? Если нет, как я доберусь до задачи, которую хочу отменить?.. Я думал, что для изменения приоритета я должен отменить и снова поставить задачу, а отмена выполняется через будущий объект.. или это так? - person sharon gur; 01.06.2015
comment
@sharongur Нет, это не так, но поскольку вы реализовали PriorityExecutor, вы можете создать собственный метод, чтобы возвращать его при выполнении. - person K Erlandsson; 01.06.2015
comment
Великолепно! Абсолютно гениально! Я изменил метод выполнения, чтобы он возвращал ComparableFutureTask, и добавил метод для повышения приоритета с помощью идентификатора моей хэш-карты, содержащей объекты Comparable, а затем просто изменил его приоритет. Большое спасибо! - person sharon gur; 01.06.2015

Что Шарон Гур предлагает в самом низу, так это изменить

//execute with New comparable task
public void execute(Runnable command, int priority) {
    super.execute(new ComparableFutureTask(command, null, priority));
}

to

//execute with New comparable task
public ComparableFutureTask  execute(Runnable command, int priority) {
    ComparableFutureTask task = new ComparableFutureTask(command, null, priority);
    super.execute(task);
    return task;
}

Затем в вашем вызывающем абоненте:

CurrentTask currentTask = new CurrentTask(priority,queue)
RunnableFuture task = enhancedExecutor.execute(currentTask,priority.value)
task?.get()

У меня была проблема, где

RunnableFuture task = myExecutor.submit(currentTask)
task?.get()

вызвало проблему, когда currentTask затем было преобразовано в FutureTask и не понимало моих объектов в CurrentTask. Как .execute один все было хорошо. Этот хак кажется полу/почти достаточно работающим.

так как он работал отлично, но файл не был создан

RunnableFuture task = myExecutor.execuute(currentTask)
    task?.get()

Так вот как я заставил это работать (приоритет обрабатывается дважды), это не кажется правильным, но работает...

Текущая задача::

class CurrentTask implements Runnable {
    private Priority priority
    private MyQueue queue

    public int getPriority() {
        return priority.value
    }

    public CurrentTask(Priority priority,ReportsQueue queue){
        this.priority = priority
        this.queue=queue
    }

    @Override
    public void run() {
...
}
}

Приоритет:

public enum Priority {

    HIGHEST(0),
    HIGH(1),
    MEDIUM(2),
    LOW(3),
    LOWEST(4)

    int value

    Priority(int val) {
        this.value = val
    }

    public int getValue(){
        return value
    }
}

Затем ваш исполнитель звонит

public YourExecutor() {

    public YourExecutor() {
        super(maxPoolSize,maxPoolSize,timeout,TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(1000,new ReverseComparator()))
    }

Поэтому, прежде чем перейти к новому методу отправки, нажмите компаратор ниже, и, будучи TaskExecutor, он не поймет .priority?.value, который по умолчанию .execute currentTask был тем, что ударило по этому, и все это сработало.

public int compare(final Runnable lhs, final Runnable rhs) {

    if(lhs instanceof Runnable && rhs instanceof Runnable){
      // Favour a higher priority
        println "${lhs} vs ${lhs.getClass()}"
      if(((Runnable)lhs)?.priority?.value<((Runnable)rhs)?.priority?.value){
 ...
}

}

так что с вышеуказанным взломом и ниже изменения, похоже, работают

class  ReverseComparator implements Comparator<ComparableFutureTask>{

  @Override
  public int compare(final ComparableFutureTask lhs, final ComparableFutureTask rhs) {

    if(lhs instanceof ComparableFutureTask && rhs instanceof ComparableFutureTask){

        // run higher priority (lower numbers before higher numbers)
        println "${lhs} vs ${lhs.getClass()} ::: ${lhs.priority}"
      if(((Runnable)lhs)?.priority<((Runnable)rhs)?.priority){
          println "-returning -1"
        return -1;
      } else if (((Runnable)lhs)?.priority>((Runnable)rhs)?.priority){
      println "-returning @@@1"
        return 1;
      } 


    }
    println "-returning ==0 "
    return 0;
  }  

Просто потому, что мы передали переопределение ComparableFutureTask, которое имеет приоритет, расширяющий FutureTask.

Надеюсь, это имеет смысл ходить в течение дня и немного сейчас :)

person V H    schedule 23.09.2016
comment
рабочую реализацию можно найти здесь github.com/vahidhedayati/grails-queuekit-plugin - person V H; 21.10.2016