Параллельная фильтрация ленивой последовательности

У меня проблема, когда я ищу числа с определенными свойствами в очень большом пространстве поиска (возможно, бесконечном, но определенно слишком большом, чтобы все пространство поместилось в памяти). Поэтому мне нужна ленивая последовательность, которую я фильтрую. Мой наивный подход заключался в использовании понимания списка (for) для всего поиска, но при этом поиск выполняется в одном потоке. Есть несколько действительно простых способов сократить пространство поиска, а также есть некоторые части поиска, которые требуют больших вычислительных ресурсов.

Мой чуть менее наивный подход заключался в том, чтобы добавить простое сокращение в выражение for и сделать функцию search, которая выполняет более тяжелую работу. Затем: (filter search (for [..... :when (prune)])). В библиотеке редукторов есть функция filter, но она не работает с ленивыми последовательностями. Я не могу преобразовать из ленивого seq из-за ограничений памяти.

Итак, как лучше всего отфильтровать ленивую последовательность параллельно? Мой последний наивный подход был бы похож на вставку последовательности в атом:

(defn accessor-gen [lazys]
  (let [s (atom [nil lazys])]
    (fn []
      (first (swap! s (fn [[_ s]] [(first s) (rest s)]))))))

Тогда я мог бы иметь пул потоков из шести или около того, используя эту функцию для поиска в пространстве.

Вопрос: у меня неприятное ощущение, что я делаю это сложнее, чем нужно. Кроме того, меня беспокоит спор по поводу атома. Есть ли более простой способ использовать ленивую последовательность параллельно? Наконец, является ли весь мой подход в корне ошибочным? Есть ли способ лучше, возможно, тот, который не требует ленивых последовательностей?


person galdre    schedule 12.10.2014    source источник


Ответы (1)


Первое, что я хотел бы попробовать, это отфильтровать pmapped seq:

(defn search [i]
        (println (Thread/currentThread) i)
        (when (zero? (rem i 10))
          i))

(take 10 (filter identity (pmap search (range))))

Фильтрация будет происходить в одном потоке, но поиск будет вычисляться параллельно.

Если то, что вы действительно хотите делать параллельно, - это фильтрация, вам нужно будет разделить ленивую последовательность и объединить результаты:

(defn search [numbers]
      (doall (filter (fn [i] (zero? (rem i 10))) numbers))) 

(take 10 (apply concat (pmap search (partition-all 1000 (range)))))
person DanLebrero    schedule 12.10.2014
comment
Я думаю, что твой первый ответ - это то, что я ищу. Второй вариант не сработает, если ленивая последовательность бесконечна. Просто проверяю: даже если все пространство, в котором производится поиск, слишком велико для экземпляра JVM, ленивость и фильтра, и pmap здесь означает, что исходная последовательность никогда не возникает полностью в памяти, верно? - person galdre; 13.10.2014
comment
Оба решения ленивы, поскольку concat тоже ленив. Я обновил ответ, чтобы показать это. - person DanLebrero; 14.10.2014
comment
Ой. Я думал, partition не был ленивым. И в этом был корень проблемы, из-за которой я сбился с пути. Спасибо! - person galdre; 14.10.2014