Дросселирование блоков Scala Future при использовании onComplete

Я пытаюсь создать много задач с интенсивным использованием ЦП, используя Scala Futures . Поскольку их так много, мне нужно ограничить создание этих заданий (потоков). Для этого я использую:

import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._

val numThread = sys.runtime.availableProcessors

import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue

implicit val context = ExecutionContext.fromExecutorService(
    new ThreadPoolExecutor(
      numThread, numThread,
      0L, TimeUnit.SECONDS,
      new ArrayBlockingQueue[ Runnable ]( numThread ) {
        override def offer( e: Runnable ) = {
          put( e ); // Waiting for empty room
          true
        }
      })
     )

Чтобы проверить это, я создал 2 очень простые функции:

import scala.util.{ Try, Success, Failure }
import scala.util.Random

def longComputation() = {
  val id = Thread.currentThread().getId
  //blocking {
    println( s"Started thread: $id" )
    Thread.sleep( 500 )
    println( s"Finished thread: $id" )
  //}
  id
}

def processResult[T](r : Try[T]) = {
  blocking {
      r match {
        case Success( id ) => println( s"Thread result: $id" )
        case Failure( t )  => println( "An error has occured: " + t.getMessage )
       }
  }

}

Затем я выполняю тест для выполнения задач через многопоточность:

def main( args: Array[ String ] ) {


   val s = Stream.from( 0 )
   //s.foreach { x => println(x) ;  val f = Future( longComputation ) ; f.onComplete{ processResult } }

   s.foreach { x => 
     println(x) 
     val f = Future( longComputation )  
     val p = Promise[Long]()
     p completeWith f
     p.future.onComplete{ processResult } 
   }

   println("Finished")
   context.shutdown
 } 

Когда я выполнил это, было запущено 16 потоков (количество процессоров равно 8). Программа напечатала сообщение Finished. Затем система блокируется, и больше ничего не выполняется. Однако если я удаляю обратный вызов, то потоки выполняются до бесконечности, как и ожидалось.

Выше я экспериментировал с blocking, а также с Promise. Никаких изменений в поведении. Итак, мой вопрос: как я могу ограничить выполнение задачи, не блокируя обратные вызовы? Если это невозможно, можно ли выполнять ввод-вывод в потоках (в будущем)?

Цените любые указатели.


person user2051561    schedule 05.12.2016    source источник


Ответы (2)


Программа работает в тупике. Предоставленный threadPool имеет фиксированный размер, поэтому происходит следующее: Future(longComputation) выделяет поток из пула потоков и начинает работать. Когда это будет сделано, onComplete выделяет Thread из пула для выполнения предоставленной функции.

Учитывая, что выполнение работы занимает больше времени, чем ее завершение, в какой-то момент все потоки заняты выполнением работы. Любой из них завершается, а onComplete также нуждается в потоке, поэтому он запрашивает его у исполнителя. Работа не может быть завершена, так как все потоки заняты и машина останавливается в тупиковой ситуации.

Мы можем разрешить этот тупик между производителем и потребителем, предоставив потребителю зарезервированные ресурсы. Таким образом, работа регулируется пулом потоков фиксированного размера, но мы гарантируем, что любая завершенная работа может быть обработана дальше.

Этот фрагмент, где я переименовал context в fixedContext, показывает использование отдельного контекста для обработки результатов, устраняя взаимоблокировку. Я также избавился от Promise, который не выполнял никакой реальной функции, кроме прокси будущего.

val fixedContext = // same as in question
val singleThreadContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
...
...
def main( args: Array[ String ] ) {

   val s = Stream.from( 0 )

   s.foreach { x => 
     println(x)
     val f = Future( longComputation )(fixedContext)  
     f.onComplete{ processResult }(singleThreadContext)
   }

   println("Finished")
   fixedContext.shutdown
 } 
}
person maasg    schedule 05.12.2016
comment
Контекст выполнения @Dima OP основан на пуле потоков с фиксированным размером и исчерпал ресурсы. global не имеет ограничений и будет продолжать принимать работу, в конечном итоге заканчиваясь на java.lang.OutOfMemoryError. Предлагаемый вариант будет работать стабильно, т.к. работа отделена от потребления. - person maasg; 06.12.2016
comment
@Dima Я изменил контекст выполнения, чтобы обработать результаты, чтобы избежать путаницы и сделать мою точку зрения о разделении двух пулов более ясной. - person maasg; 06.12.2016
comment
Первоначальные тесты показывают, что это работает. Однако один вопрос: зачем выбирать контекст с одним потоком? Я могу придерживаться этого, потому что я буду выполнять ввод-вывод на одном ресурсе, но меня интересуют причины, лежащие в основе этого варианта. - person user2051561; 06.12.2016
comment
@user2051561 user2051561 Я использовал контекст одного потока, чтобы четко проиллюстрировать, что нам нужны отдельные контексты. В этом примере будет достаточно одного потока, но в реальной жизни используйте ресурсы, размер которых соответствует варианту использования. - person maasg; 06.12.2016
comment
Хорошо, понятно. - person user2051561; 06.12.2016
comment
Это не очень хорошее решение: во-первых, оно нарушает исходное ограничение, поскольку имеет больше потоков, чем предполагалось. И что еще более важно, он сериализует обратные вызовы, делая единственный поток, выполняющий их, узким местом. Кстати, @maasg к вашему замечанию о нехватке памяти ... вы понимаете, что ваш singleThreadContext имеет неограниченную очередь (которой способствуют все выполнения), не так ли? ;) - person Dima; 06.12.2016
comment
@Dima - твой аргумент не всегда может быть верным. Учтите, что обратные вызовы становятся очередью только после завершения основных задач. Поскольку мы ограничиваем основные задачи, обратные вызовы также ограничиваются. Ошибка OOM была бы проблемой только в том случае, если бы обратные вызовы выполнялись намного дольше, чем основные задачи. В моем случае это не так - я буду просто записывать результаты, пока основные задачи загружают процессор. - person user2051561; 06.12.2016
comment
@user2051561 user2051561 Обратные вызовы не должны занимать много времени. Помните, что все они выполняются в одном потоке, при этом вычисления распараллелены. Итак, пока они занимают менее чем в 8 раз меньше времени, чем расчет в среднем, у вас есть проблема. Обратите внимание, что число 8 относится к вашей машине. Если вы перенесете свой код на 16-процессорный блок, вам теперь потребуется, чтобы обратные вызовы выполнялись в 16 раз быстрее, чем вычисления... Это не хорошее решение, оно хрупкое и неэффективное. И я говорю это не только потому, что мой лучше, поверьте мне :) - person Dima; 06.12.2016
comment
@Dima Дима, я буду помнить о твоем решении - количество кода, которое нужно изменить, минимально. Благодарю вас за то, что вы указываете на проблемы здесь. - person user2051561; 06.12.2016
comment
@Dima Цель состоит в том, чтобы показать подход на минимальном рабочем примере. Я не утверждаю, что окончательное решение должно использовать один поток. Давайте проведем различие между концепцией и примером: концепция состоит в том, чтобы отделить ресурсы от производства и потребления. В примере используется один поток для потребления, b/c это минимум, который здесь необходим. - person maasg; 06.12.2016
comment
@Dima, это нарушает исходное ограничение, имея больше потоков, чем предполагалось -> использовать n-1 + 1 поток. Я не думаю, что подход ломается в этом вопросе. - person maasg; 06.12.2016
comment
@Dima re: Это не очень хорошее решение, оно хрупкое и неэффективное. Я категорически не согласен. Разделение ресурсов имеет важное значение в системах, имеющих дело с процессами, которые имеют большое несоответствие импеданса, например, в этом вопросе тяжелый процессор и легкий ввод-вывод. Пример: веб-сервер выделит пул потоков для обслуживания соединений, резервируя отдельный пул для обработки БД. Вы можете отказаться принимать новое соединение, но хотите, чтобы все существующие соединения выполнялись. Это тот же принцип, который я хочу передать здесь. - person maasg; 06.12.2016
comment
@ user2051561 Я ответил на вопросы. Я не думаю, что аргументы в силе. - person maasg; 06.12.2016

Когда поток завершает longComputation, он пытается поместить задание в очередь для выполнения обратного вызова и блокируется, поскольку очередь заполнена. Итак, в конце концов, первая «партия» заданий завершается, но все потоки все еще заняты, ожидая в очереди, чтобы запланировать обратный вызов, и ничего не доступно для извлечения из очереди.

Решение? Удалить лимит из очереди. Таким образом, потоки, пытающиеся отправить обратные вызовы, не будут заблокированы и станут доступными для выполнения следующей задачи.

Возможно, вы захотите что-то вставить в ваш цикл производителя, чтобы немного замедлить его, чтобы ваша неограниченная очередь не съедала всю память. Возможно, Semaphore?

val sem = new Semaphore(numThread*2)
def processResult[T](r : Try[T]) = blocking {
  r match {
    case Success( id ) => println( s"Thread result: $id" )
    case Failure( t )  => println( "An error has occured: " + t.getMessage )
  }
  sem.release
}

Stream.from(0).foreach { _ => 
  sem.acquire
  new Future(longComputation).onComplete(processResult)
}

С этим вам не нужен ваш собственный контекст выполнения - значение по умолчанию scala действительно будет работать лучше для того, что вы хотите сделать.

person Dima    schedule 05.12.2016
comment
Хорошо, теперь я понимаю, почему тупик. Обратите внимание, что, как указывает maasg, если я использую контекст по умолчанию (поддерживаемый ForkJoinPool), очередь будет расти бесконечно и приведет к ошибке OOM. Что касается семафора, я хочу избежать синхронизации с моей стороны, чтобы избежать тупика ;-) - person user2051561; 06.12.2016
comment
Да, я знаю, что он будет расти бесконечно - таким образом, семафор. Что касается предотвращения синхронизации, если вы думаете, что явная отправка обратного вызова в другой пул или использование BlockingQueue не синхронизируется... подумайте еще раз :) Это просто усложняет решение и нарушает ограничение проблемы (вы заканчиваете иметь больше потоков, чем вы хотели) и сериализовать обратные вызовы, что, возможно, станет узким местом - person Dima; 06.12.2016
comment
@user2051561 user2051561 Связывание производителя и потребителя в одном потоке, как это сделано здесь, будет полезно, только если оба процесса имеют одинаковые ограничения ресурсов и приоритеты. В этом случае тяжелые вычисления связаны с легкой операцией ввода-вывода, я хотел бы, чтобы мои потоки ЦП были освобождены как можно скорее, чтобы они могли взять на себя новую работу, и не давал бы им задачу ввода-вывода, которая потенциально может заблокировать ожидание внешних ресурсов (диск, сеть, ...). Количество потоков, назначаемых производителю и потребителю, будет зависеть от этих ограничений. - person maasg; 06.12.2016