Проблема справедливости в ScheduledExecutorService

В следующем примере показана проблема в ScheduledExecutorService. Я планирую две задачи «1» и «2», которые выполняются дольше, чем интервал расписания. Задача «2» отправляет другую задачу на выполнение только один раз.

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestExecutorFairness {
  public static void main(final String[] args) {

    final int interval = 200;
    final int sleeptime = 600;

    final ScheduledExecutorService executor = Executors
        .newSingleThreadScheduledExecutor();

    // schedule task 1
    executor.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(sleeptime);
        } catch (final InterruptedException e) {
          e.printStackTrace();
        }

        System.out.println("1");
      }
    }, interval, interval, TimeUnit.MILLISECONDS);

    // schedule task 2
    executor.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(sleeptime);
        } catch (final InterruptedException e) {
          e.printStackTrace();
        }

        System.out.println("2");

        // submit task 3
        executor.submit(new Runnable() {

          @Override
          public void run() {
            System.out.println("3");
          }
        });
      }
    }, interval, interval, TimeUnit.MILLISECONDS);

  }
}

Результат, который я ожидаю, будет чем-то вроде

1
2
1
2
3

Но так не выполняется. Задача "3" очень долго задерживается, но мне нужно, чтобы она была выполнена как можно скорее.

Есть ли способ изменить это поведение на более справедливое? Или у кого-то есть лучшее решение?


person Stephan    schedule 12.02.2014    source источник


Ответы (1)


Интересно. Это кажется нелогичным, потому что JvaDoc ScheduledExecutorService явно упоминает

Команды, отправленные с использованием методов Executor.execute(java.lang.Runnable) и ExecutorService, планируются с запрошенной нулевой задержкой.

Таким образом, можно предположить, что должно быть возможно отправлять такие команды. Но в этом случае есть некоторые особенности. Я не могу указать ТОЧНУЮ причину такого поведения, но это явно связано с

  • Задачи, занимающие больше времени, чем интервал расписания
  • Новая задача отправляется из выполненной задачи
  • Тот факт, что ScheduledExecutorService внутри использует DelayedWorkQueue
  • Самое главное: вы используете однопоточный ScheduledExecutorService.

Серьезной проблемой также может быть то, что это заполняет рабочую очередь и рано или поздно приведет к ошибке OutOfMemoryError. Это также можно увидеть в этом (слегка скорректированном) примере:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestExecutorFairness {
  public static void main(final String[] args) {

    final int interval = 200;
    final int sleeptime = 600;

    final ScheduledExecutorService executor = 
        Executors.newScheduledThreadPool(1);

    final long start = System.currentTimeMillis();

    // schedule task 1
    executor.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(sleeptime);
        } catch (final InterruptedException e) {
          e.printStackTrace();
        }

        System.out.println("1 at "+(System.currentTimeMillis()-start));
      }
    }, interval, interval, TimeUnit.MILLISECONDS);

    // schedule task 2
    executor.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(sleeptime);
        } catch (final InterruptedException e) {
          e.printStackTrace();
        }

        System.out.println("2 at "+(System.currentTimeMillis()-start));


        System.out.println("Submitting 3 to "+executor);
        // submit task 3
        executor.submit(new Runnable() {

          @Override
          public void run() {
              System.out.println("3 at "+(System.currentTimeMillis()-start));
          }
        });
      }
    }, interval, interval, TimeUnit.MILLISECONDS);

  }
}

Количество "задач в очереди" в Исполнителе постоянно увеличивается.

Решение в этом случае достаточно простое: вместо

Executors.newScheduledThreadPool(1)

вы можете просто создать

Executors.newScheduledThreadPool(3)

Конечно, это меняет «временное поведение» в этом примере. Я должен предположить, что Thread.sleep() в этом примере предназначался исключительно для имитации сложных вычислений, которые не вписывались в этот пример кода. Но, возможно, просто убедиться, что количество потоков не менее numberOfPeriodicTasks+1, также можно применить в вашем реальном приложении.

person Marco13    schedule 12.02.2014
comment
Я также рассматривал возможность большего количества потоков, но Thread.sleep() является заполнителем для связи Modbus TCP. Библиотека для этого не предназначена для многопоточности. Я также мог бы разделить на одного исполнителя для планирования и одного для выполнения, но это заполнит очередь исполняющего, если задачи будут занимать слишком много времени. - person Stephan; 12.02.2014
comment
Таким образом, есть 2 периодические задачи, которые могут занять больше времени, чем интервал планирования, и они блокируют текущий поток - я не могу представить, как в этом случае можно выполнить третью задачу немедленно. Единственное решение, которое я вижу сейчас, исходя из текущего описания, состоит в том, чтобы НЕ планировать Task3, а просто выполнять соответствующее Runnable непосредственно в Task2, но, поскольку это слишком очевидно, я предполагаю, что есть причина, по которой вы этого не сделали... - person Marco13; 12.02.2014
comment
Я выполнил задачу 3 в задаче 2, чтобы имитировать более поздний вызов. Но я хочу, чтобы прямые отправки имели более высокий приоритет, чем запланированные задачи. - person Stephan; 12.02.2014
comment
Да, я посмотрел исходный код ScheduledExecutorService и т. д. и подумал, что можно заменить DelayedWorkQueue очередью с приоритетом. Но инфраструктура вокруг этого (внутренняя обертка всех задач в ScheduledFutureTasks и т. д.) делает это сложным. В зависимости от точных требований можно рассмотреть возможность создания собственной реализации ScheduledExecutorService с желаемым поведением, но это может потребовать значительных усилий. Может быть, кто-то найдет более простое и элегантное решение. - person Marco13; 12.02.2014
comment
@Stephan Вы приняли это, но решило ли это проблему (или, по крайней мере, помогло решить ее)? В противном случае, возможно, специальный вопрос о чем-то вроде назначения приоритета задачам в однопоточном ScheduledExecutorService или около того может привлечь некоторое внимание и принести дополнительную помощь (особенно когда вы упоминаете ограничение, что оно должно уже быть однопоточным) в вопросе) - person Marco13; 15.02.2014
comment
Я принял ответ, потому что он помог мне найти проблему. Мой обходной путь — это задача перепланирования, которая измеряет время, отменяет запланированные задачи и перепланирует их с более длинным интервалом времени. - person Stephan; 15.02.2014
comment
Это звучит как хлопот. Я думаю, что некоторая концепция приоритетов в ScheduledExecutorService была бы неплохой. Но, как я уже упоминал, кажется, что это может быть сложно реализовать на основе существующих реализаций (на самом деле, я думаю, что реализовать это с нуля в форме собственного PriorityScheduledExecutorService может быть проще). Однако, если вы нашли удовлетворительное решение, то пока все в порядке. - person Marco13; 15.02.2014