Когда использовать неблокирующий ›! / потоки и блокировка ›!! /горутины с clojure core.async

Я пишу процесс ETL для чтения данных уровня событий из базы данных продукта, преобразования/объединения их и записи в хранилище данных аналитики. Я использую библиотеку clojure core.async, чтобы разделить эти процессы на параллельно выполняющиеся компоненты. Вот как сейчас выглядит основная часть моего кода

    (ns data-staging.main
        (:require [clojure.core.async :as async])
        (:use [clojure.core.match :only (match)]
              [data-staging.map-vecs]
              [data-staging.tables])
        (:gen-class))

    (def submissions (make-table "Submission" "Valid"))
    (def photos (make-table "Photo"))
    (def videos (make-table "Video"))
    (def votes (make-table "Votes"))

    ;; define channels used for sequential data processing
    (def chan-in (async/chan 100))
    (def chan-out (async/chan 100))

    (defn write-thread [table]
        "infinitely loops between reading subsequent 10000 rows from 
         table and ouputting a vector of the rows(maps) 
         into 'chan-in'"
        (while true
            (let [next-rows (get-rows table)]
                (async/>!! chan-in next-rows)
                (set-max table (:max-id (last next-rows))))))

    (defn aggregator []
        "takes output from 'chan-in' and aggregates it by coupon_id, date.
         then adds / drops any fields that are needed / not needed and inputs
         into 'chan-out'"
        (while true
            (->>
                (async/<!! chan-in)
                aggregate
                (async/>!! chan-out))))

    (defn read-thread []
        "reads data from chan out and interts into Analytics DB" 
        (while true 
            (upsert (async/<!! chan-out))))

    (defn -main []
        (async/thread (write-thread submissions))
        (async/thread (write-thread photos))
        (async/thread (write-thread videos))
        (async/thread-call aggregator)
        (async/thread-call read-thread))

Как видите, я помещаю каждый компонент ОС в отдельный поток и использую блокировку >!! Звоните по каналам. Это похоже на использование неблокирующего >! вызовы вместе с подпрограммами go могут быть лучше для этого варианта использования, особенно для операций чтения базы данных, которые тратят большую часть своего времени на выполнение операций ввода-вывода и ожидание новых строк в базе данных продукта. Так ли это, и если да, то как лучше всего это реализовать? Я немного не понимаю всех компромиссов между двумя методами и того, как именно эффективно использовать подпрограммы go. Также будут высоко оценены любые другие предложения по улучшению общей архитектуры!


person Sean Geoffrey Pietz    schedule 29.01.2014    source источник
comment
Если по какой-либо причине на этот вопрос трудно ответить, может ли кто-нибудь указать, как я могу отредактировать его, чтобы сделать его более ясным. Кроме того, если кто-нибудь знает хороший онлайн-учебник по процедурам clojure go, я тоже был бы признателен.   -  person Sean Geoffrey Pietz    schedule 30.01.2014


Ответы (2)


Лично я думаю, что ваше использование потоков здесь, вероятно, правильное решение. Волшебная неблокирующая природа go-blocks происходит от «парковки», которая представляет собой особый вид псевдоблокировки, который использует конечный автомат core.async, но, поскольку ваша база данных вызывает действительно блок, а не переводит конечный автомат в припаркованное состояние. , вы просто заблокируете какой-то поток из пула потоков core.async. Это зависит от того, сколько времени занимают ваши синхронные вызовы, так что это тот случай, когда эталонные тесты могут быть информативными, но я сильно подозреваю, что потоки здесь — правильный подход.

Единственным исключением является ваша функция агрегатора. Мне кажется, что это можно просто сложить в определение чан-аута, как (def chan-out (map< aggregate chan-in)).

Для общего обзора go-blocks по сравнению с потоками Мартин Тройер написал хороший рассмотрение двух подходов и определения того, какой из них быстрее в какой ситуации. Версия Cliff Notes заключается в том, что блоки go хороши для адаптации уже асинхронных библиотек для использования с core.async, а потоки хороши для создания асинхронных процессов из синхронных частей. Например, если бы ваша база данных имела API на основе обратного вызова, то go-blocks был бы определенным выигрышем. Но так как это синхронно, они не подходят.

person Chuck    schedule 30.01.2014
comment
отличный ответ! Спасибо, что поделились, я многому учусь на этом сайте - person tangrammer; 01.02.2014

я думаю, что было бы лучше использовать макросы "go", чтобы иметь неблокирующие потоки в этом случае ETL.

Я написал очень простой код для достижения синхронизированной последовательности процессов, подразумеваемой в задачах извлечения, преобразования и загрузки.

Введите в ответе следующий код:

(require '[clojure.core.async :as async :refer [<! >! <!! timeout chan alt! go]])

(def output(chan))

(defn extract [origin]
  (let [value-extracted (chan)
        value-transformed (chan)
        value-loaded (chan)]
    (go
     (<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
     (>! value-extracted  (str origin " > extracted  ")))
    (go
     (<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
     (>! value-transformed  (str (<! value-extracted) " > transformed " )))
    (go
     (<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
     (>! value-loaded  (str (<! value-transformed) " > loaded " )))
    (go
     (<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
     (>! output  [origin (<! value-loaded)]))))

(go
 (loop [origins-already-loaded []]
   (let [[id message] (<! output)
         origins-updated (conj origins-already-loaded id)]
     (println message)
     (println origins-updated)
     (recur origins-updated)
     )
   ))

Введите в ответе:

(doseq [example (take 10 (range))] (extract example))

1 > extracted   > transformed  > loaded 
[1]
7 > extracted   > transformed  > loaded 
[1 7]
0 > extracted   > transformed  > loaded 
[1 7 0]
8 > extracted   > transformed  > loaded 
[1 7 0 8]
3 > extracted   > transformed  > loaded 
[1 7 0 8 3]
6 > extracted   > transformed  > loaded 
[1 7 0 8 3 6]
2 > extracted   > transformed  > loaded 
[1 7 0 8 3 6 2]
5 > extracted   > transformed  > loaded 
[1 7 0 8 3 6 2 5]
9 > extracted   > transformed  > loaded 
[1 7 0 8 3 6 2 5 9]
4 > extracted   > transformed  > loaded 
[1 7 0 8 3 6 2 5 9 4]

ОБНОВЛЕНИЕ:
исправлена ​​ошибка, заключавшаяся в использовании <!! (timeout (+ 100 (* 100 (rand-int 20))))) внутри удаленной функции "подожди-ка", которая блокировала другие процессы без блокировки.

person tangrammer    schedule 30.01.2014
comment
Круто, я попробую это. Любая причина, по которой этот подход лучше, у меня небольшие проблемы с пониманием преимуществ/недостатков горутин и потоков для этого типа использования. - person Sean Geoffrey Pietz; 30.01.2014
comment
Еще раз привет @SeanGeoffreyPietz, что касается вашего комментария, я нашел ошибку * в своем коде и исправил ее. Причиной отказа от использования блокирующих потоков является производительность. С макросами go вы можете иметь тысячи независимых процессов (псевдопотоков) swannodette.github .io/2013/08/02/100000-processes и в приведенном выше сценарии вы также можете (doseq [example (take 1000 (range))] (extract example)) оценить поведение. - person tangrammer; 30.01.2014