У меня есть очень длинная последовательность строк, которые по отдельности должны быть обработаны некоторой функцией обработки, а затем собраны как другой объект последовательности. Проблема кажется идеально подходящей для атаки типа fork/join.
Функция является членом класса, создание экземпляра которого требует больших затрат. Но создание экземпляра и совместное использование одного объекта класса между фьючерсами, казалось, вызывало проблемы, поэтому я создаю в 4 раза больше доступных процессоров, а затем распределяю их между фьючерсами.
// instantiate processing class objects
val processors = 1 to SomeNumber map (x=> new MyProcessor)
val processorstream = Stream.continually(processors).flatten
// the input strings
val input: Seq[String] = some sequence of strings
val splitinput = input.grouped(some large number).toStream
// create futures
val mytask = splitinput.zip(processorstream).collect {
case (subseq of strings, processor) => future {
map elements of subsequence of strings with processor}}
Затем я собираю вывод так
val result = mytask.map(x => x.apply()).reduce(_++_) // or some appropriate concatenation operator
Моя проблема в том, что это не дает мне полной загрузки процессора, хотя у меня 8 ядер. Он использует только одно ядро.
Чтобы исследовать, альтернатива, которую я пробовал, была
val input: Seq[String] = some sequence of strings
// no stage where I split the input into subsequences
val mytask = input.zip(processorstream).collect {
case (string, processor) => future {
process string with processor}}
val result = mytask.map(x => x.apply())
Эта альтернатива и работала, и не работала. Он достиг полной загрузки процессора, но было выдано несколько исключений, потому что (гипотеза) процессор слишком быстро обрабатывал каждую строку, и иногда один и тот же объект процессора применялся к разным строкам одновременно.
Я еще более уверен в своей гипотезе о том, что процессоры работают слишком быстро, потому что, если я ввожу более длинный ввод (скажем, целые текстовые документы вместо заголовков из 10 слов), я получаю полную загрузку процессора без каких-либо исключений.
Я также пробовал экспериментировать с фьючерсами akka и обещаниями scalaz, и все они, кажется, используют только один процессор, когда я разделяю входную последовательность на подпоследовательности.
Итак, как мне получить полную загрузку ЦП с фьючерсами в этом случае, используя подпоследовательности строк в качестве входных данных?
map
параллельно. См. docs.scala-lang.org/overviews/parallel-collections/< /а> - person Kim Stebel   schedule 18.08.2012