Во время просмотра доклада Роба Пайка Go Concurrency Patterns talk я наткнулся на Реализация Concurrent Prime Sieve в Go, которую Роб назвал красивым параллельным кодом. Достаточно любопытно, я проверил пример и решил реализовать его в Clojure.

Начнем с создания генератора бесконечной последовательности (начиная с 2):

(defn num-generator [out]
  (go
    (loop [n 2]
      (>!! out n)
      (recur (inc n)))))

Блок go гарантирует, что тело выполняется в отдельном потоке. Я использовал блокировку вместо парковочной, чтобы генерировать номер только тогда, когда это требуется программе. Если бы вместо этого я использовал парковочный пут >!, мы бы столкнулись с исключением OutOfMemory при выполнении функции.

Давайте протестируем этот генератор:

(def ichan (chan)) ; Blocks at the put statement, waiting for a read 
(num-generator ichan) 
(<!! ichan) ; => 2 
(<!! ichan) ; => 3 
(<!! ichan) ; => 4

Конечно, не лучший способ проверить, но, похоже, он работает так, как ожидалось.

Далее давайте напишем функцию фильтра, которая принимает входной канал, выходной канал и значение и отфильтровывает непростые числа из входного канала перед отправкой их в выходной канал. Это то, к чему относится «основное сито».

(defn filter-prime [in out val]
  (go
    (loop [n 0]
      (let [i (<!! in)]
        (when (not (= (mod i val) 0))
          (>!! out i)))
      (recur (inc n)))))

Прежде чем тестировать filter-prime, давайте свяжем две вышеупомянутые функции вместе с пользовательским вводом, используя fetch-prime:

((defn fetch-prime [count]
  (let [ch (chan)]
    (num-generator ch)
    (loop [n count
           c ch]
      (when (> n 0)
        (let [p (<!! c)
              ch1 (chan)]
          ;; Just printing the prime nos for now
          (println "Prime => " p)
          (filter-prime c ch1 p)
          (recur (dec n) ch1))))
    (close! ch)))

И, наконец, проверьте, работает ли это:

(fetch-prime 6)
; Prime => 2
; Prime => 3
; Prime => 5
; Prime => 7
; Prime => 11
; Prime => 13

Весь процесс можно представить в виде набора сит, выстроенных друг над другом вертикально, через которые проходят числа из генератора. По сути, каждое сито представляет собой filter-prime с последним простым числом val, переданным в качестве параметра. На любом проходе, если значение модуля равно нулю, число (полученное из генератора) в этом проходе отбрасывается.

Хотя я немного потрудился над этим, в конце концов я получил гораздо лучшее представление о модели параллелизма, используя блоки go в Clojure. То, как различные потоки взаимодействуют друг с другом в виде фильтра последовательной цепочки для удаления не простых чисел, делает этот пример отличным примером для демонстрации параллелизма.