Блокировка в будущем scala?

У меня есть очень длинная последовательность строк, которые по отдельности должны быть обработаны некоторой функцией обработки, а затем собраны как другой объект последовательности. Проблема кажется идеально подходящей для атаки типа fork/join.

Функция является членом класса, создание экземпляра которого требует больших затрат. Но создание экземпляра и совместное использование одного объекта класса между фьючерсами, казалось, вызывало проблемы, поэтому я создаю в 4 раза больше доступных процессоров, а затем распределяю их между фьючерсами.

// instantiate processing class objects
val processors = 1 to SomeNumber map (x=> new MyProcessor)
val processorstream = Stream.continually(processors).flatten
// the input strings
val input: Seq[String] = some sequence of strings
val splitinput = input.grouped(some large number).toStream
// create futures
val mytask = splitinput.zip(processorstream).collect {
    case (subseq of strings, processor) => future {
        map elements of subsequence of strings with processor}}

Затем я собираю вывод так

val result = mytask.map(x => x.apply()).reduce(_++_) // or some appropriate concatenation operator

Моя проблема в том, что это не дает мне полной загрузки процессора, хотя у меня 8 ядер. Он использует только одно ядро.

Чтобы исследовать, альтернатива, которую я пробовал, была

val input: Seq[String] = some sequence of strings
// no stage where I split the input into subsequences
val mytask = input.zip(processorstream).collect {
    case (string, processor) => future {
        process string with processor}}
val result = mytask.map(x => x.apply())

Эта альтернатива и работала, и не работала. Он достиг полной загрузки процессора, но было выдано несколько исключений, потому что (гипотеза) процессор слишком быстро обрабатывал каждую строку, и иногда один и тот же объект процессора применялся к разным строкам одновременно.

Я еще более уверен в своей гипотезе о том, что процессоры работают слишком быстро, потому что, если я ввожу более длинный ввод (скажем, целые текстовые документы вместо заголовков из 10 слов), я получаю полную загрузку процессора без каких-либо исключений.

Я также пробовал экспериментировать с фьючерсами akka и обещаниями scalaz, и все они, кажется, используют только один процессор, когда я разделяю входную последовательность на подпоследовательности.

Итак, как мне получить полную загрузку ЦП с фьючерсами в этом случае, используя подпоследовательности строк в качестве входных данных?


person JasonMond    schedule 18.08.2012    source источник
comment
Почему бы не использовать .par из параллельных коллекций?   -  person om-nom-nom    schedule 18.08.2012
comment
@om-nom-nom Что это? И как бы это применить здесь? Я взял scala только несколько месяцев назад.   -  person JasonMond    schedule 18.08.2012
comment
Параллельные коллекции выполняют операции вроде map параллельно. См. docs.scala-lang.org/overviews/parallel-collections/< /а>   -  person Kim Stebel    schedule 18.08.2012


Ответы (2)


Per @om-nom-nom :

input.par.map { s => task(s) }
person Connor Doyle    schedule 18.08.2012
comment
И вы даже можете избавиться от шума: задача карты input.par - person Viktor Klang; 20.08.2012
comment
Виктор, это твое представление об отдыхе? :) - person Connor Doyle; 21.08.2012

Вы можете попробовать использовать ThreadLocal для своих изменяемых процессоров. Довольно бесполезный пример:

val words = io.Source.fromFile("/usr/share/dict/words").getLines.toIndexedSeq

class Processor {
  val sb = new StringBuffer() // mutable!
  println("---created processor---")
  def map(s: String): Int = {
    sb.setLength(0)
    for (i <- 1 to s.length()) {
      sb.append(s.substring(0, i))
    }
    sb.toString().sum.toInt  // don't make any sense out of this
  }
}

val tl = new ThreadLocal[Processor] {
  override protected def initialValue() = new Processor
}

val parRes = words.par.map(w => tl.get.map(w)).sum
val serRes = words.map(    w => tl.get.map(w)).sum
assert(parRes == serRes)

Это создаст по умолчанию столько потоков, сколько ядер ЦП, как свидетельствуют сообщения ---created processor---.

person 0__    schedule 18.08.2012