Я пытаюсь создать много задач с интенсивным использованием ЦП, используя Scala Futures а>. Поскольку их так много, мне нужно ограничить создание этих заданий (потоков). Для этого я использую:
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._
val numThread = sys.runtime.availableProcessors
import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue
implicit val context = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(
numThread, numThread,
0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[ Runnable ]( numThread ) {
override def offer( e: Runnable ) = {
put( e ); // Waiting for empty room
true
}
})
)
Чтобы проверить это, я создал 2 очень простые функции:
import scala.util.{ Try, Success, Failure }
import scala.util.Random
def longComputation() = {
val id = Thread.currentThread().getId
//blocking {
println( s"Started thread: $id" )
Thread.sleep( 500 )
println( s"Finished thread: $id" )
//}
id
}
def processResult[T](r : Try[T]) = {
blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
}
}
Затем я выполняю тест для выполнения задач через многопоточность:
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
//s.foreach { x => println(x) ; val f = Future( longComputation ) ; f.onComplete{ processResult } }
s.foreach { x =>
println(x)
val f = Future( longComputation )
val p = Promise[Long]()
p completeWith f
p.future.onComplete{ processResult }
}
println("Finished")
context.shutdown
}
Когда я выполнил это, было запущено 16 потоков (количество процессоров равно 8). Программа напечатала сообщение Finished. Затем система блокируется, и больше ничего не выполняется. Однако если я удаляю обратный вызов, то потоки выполняются до бесконечности, как и ожидалось.
Выше я экспериментировал с blocking
, а также с Promise
. Никаких изменений в поведении. Итак, мой вопрос: как я могу ограничить выполнение задачи, не блокируя обратные вызовы? Если это невозможно, можно ли выполнять ввод-вывод в потоках (в будущем)?
Цените любые указатели.