Я пишу процесс ETL для чтения данных уровня событий из базы данных продукта, преобразования/объединения их и записи в хранилище данных аналитики. Я использую библиотеку clojure core.async, чтобы разделить эти процессы на параллельно выполняющиеся компоненты. Вот как сейчас выглядит основная часть моего кода
(ns data-staging.main
(:require [clojure.core.async :as async])
(:use [clojure.core.match :only (match)]
[data-staging.map-vecs]
[data-staging.tables])
(:gen-class))
(def submissions (make-table "Submission" "Valid"))
(def photos (make-table "Photo"))
(def videos (make-table "Video"))
(def votes (make-table "Votes"))
;; define channels used for sequential data processing
(def chan-in (async/chan 100))
(def chan-out (async/chan 100))
(defn write-thread [table]
"infinitely loops between reading subsequent 10000 rows from
table and ouputting a vector of the rows(maps)
into 'chan-in'"
(while true
(let [next-rows (get-rows table)]
(async/>!! chan-in next-rows)
(set-max table (:max-id (last next-rows))))))
(defn aggregator []
"takes output from 'chan-in' and aggregates it by coupon_id, date.
then adds / drops any fields that are needed / not needed and inputs
into 'chan-out'"
(while true
(->>
(async/<!! chan-in)
aggregate
(async/>!! chan-out))))
(defn read-thread []
"reads data from chan out and interts into Analytics DB"
(while true
(upsert (async/<!! chan-out))))
(defn -main []
(async/thread (write-thread submissions))
(async/thread (write-thread photos))
(async/thread (write-thread videos))
(async/thread-call aggregator)
(async/thread-call read-thread))
Как видите, я помещаю каждый компонент ОС в отдельный поток и использую блокировку >!! Звоните по каналам. Это похоже на использование неблокирующего >! вызовы вместе с подпрограммами go могут быть лучше для этого варианта использования, особенно для операций чтения базы данных, которые тратят большую часть своего времени на выполнение операций ввода-вывода и ожидание новых строк в базе данных продукта. Так ли это, и если да, то как лучше всего это реализовать? Я немного не понимаю всех компромиссов между двумя методами и того, как именно эффективно использовать подпрограммы go. Также будут высоко оценены любые другие предложения по улучшению общей архитектуры!