Проблема: мне нужно написать приложение для обработки нескольких сотен файлов, на выполнение каждого из которых уйдет несколько сотен мегабайт и несколько секунд. Я написал его с использованием Future[Report]
объектов, созданных с помощью Executors.newFixedThreadPool()
, но получил ошибки нехватки памяти, потому что объект List[Future[Report]]
, возвращенный ExecutorService.invokeAll()
, удерживал промежуточную память, используемую каждым процессом. Я решил проблему, вернув объекты Report
из локальных методов в процессорах после вычисления значений Report
(всего несколько сотен строк на Report
) вместо выполнения вычислений в методе call
(из интерфейса Callable
).
Вместо этого я хотел бы попробовать решить эту проблему с помощью Scala Actors. Я создал класс, который принимает последовательность заданий (параметризованные типы для заданий, результатов и функции обработки) и обрабатывает каждое в одном из настраиваемого числа экземпляров Worker
(подкласс Actor
). Код следует.
Проблемы:
Я не уверен, что моя обработка верна.
Я не люблю использовать
CountDownLatch
для задержки возврата результата от диспетчера.Я бы предпочел написать более «функциональную» версию диспетчера, которая не изменяет
jobsQueue
список илиworkers
хэш-карту, возможно, заимствуя хвостовую рекурсивную структуруloop
из Clojure (я использовал метод@tailrec def loop
в другом коде Scala).
Я с нетерпением жду публикации «Актеры в Скале» Филиппа Галлера и Фрэнка Соммерса.
Вот код:
package multi_worker
import scala.actors.Actor
import java.util.concurrent.CountDownLatch
object MultiWorker {
private val megabyte = 1024 * 1024
private val runtime = Runtime.getRuntime
}
class MultiWorker[A, B](jobs: List[A],
actorCount: Int)(process: (A) => B) {
import MultiWorker._
sealed abstract class Message
// Dispatcher -> Worker: Run this job and report results
case class Process(job: A) extends Message
// Worker -> Dispatcher: Result of processing
case class ReportResult(id: Int, result: B) extends Message
// Worker -> Dispatcher: I need work -- send me a job
case class SendJob(id: Int) extends Message
// Worker -> Dispatcher: I have stopped as requested
case class Stopped(id: Int) extends Message
// Dispatcher -> Worker: Stop working -- all jobs done
case class StopWorking extends Message
/**
* A simple logger that can be sent text messages that will be written to the
* console. Used so that messages from the actors do not step on each other.
*/
object Logger
extends Actor {
def act() {
loop {
react {
case text: String => println(text)
case StopWorking => exit()
}
}
}
}
Logger.start()
/**
* A worker actor that will process jobs and return results to the
* dispatcher.
*/
class Worker(id: Int)
extends Actor{
def act() {
// Ask the dispatcher for an initial job
dispatcher ! SendJob(id)
loop {
react {
case Process(job) =>
val startTime = System.nanoTime
dispatcher ! ReportResult(id, process(job))
val endTime = System.nanoTime
val totalMemory = (runtime.totalMemory / megabyte)
val usedMemory = totalMemory - (runtime.freeMemory / megabyte)
val message = "Finished job " + job + " in " +
((endTime - startTime) / 1000000000.0) +
" seconds using " + usedMemory +
"MB out of total " + totalMemory + "MB"
Logger ! message
dispatcher ! SendJob(id)
case StopWorking =>
dispatcher ! Stopped(id)
exit()
}
}
}
}
val latch = new CountDownLatch(1)
var res = List.empty[B]
/**
* The job dispatcher that sends jobs to the worker until the job queue
* (jobs: TraversableOnce[A]) is empty. It then tells the workers to
* stop working and returns the List[B] results to the caller.
*/
val dispatcher = new Actor {
def act() {
var jobQueue = jobs
var workers = (0 until actorCount).map(id => (id, new Worker(id))).toMap
workers.values.foreach(_.start())
loop {
react {
case ReportResult(id, result) =>
res = result :: res
if (jobQueue.isEmpty && workers.isEmpty) {
latch.countDown()
exit()
}
case SendJob(id) =>
if (!jobQueue.isEmpty) {
workers(id) ! Process(jobQueue.head)
jobQueue = jobQueue.tail
}
case Stopped(id) =>
workers = workers - id
}
}
}
}
dispatcher.start()
/**
* Get the results of the processing -- wait for the dispatcher to finish
* before returning.
*/
def results: List[B] = {
latch.await()
res
}
}