Как выполнять параллельные транзакции в Clojure

У меня есть последовательность клиентов, которую нужно обрабатывать параллельно. Я пытался использовать pmap для этого. Результат мучительно медленный, намного медленнее, чем последовательная реализация. Внутренняя функция process-customer имеет транзакцию. Очевидно, что pmap запускает все транзакции сразу, и в итоге они повторно пытаются убить производительность. Каков наилучший способ распараллелить это?

(defn process-customers [customers]
  (doall 
    (pmap 
      (fn [sub-customers]
        (doseq [customer sub-customers]
          (process-customer customer)))
      (partition-all 10 customers))))

EDIT: функция process-customer включает следующие шаги. Я пишу шаги для краткости. Все шаги находятся внутри транзакции, чтобы гарантировать, что другая параллельная транзакция не вызовет несоответствий, таких как отрицательный запас.

(defn- process-customer [customer]
  "Process `customer`. Consists of three steps:
  1. Finding all stores in which the requested products are still available.
  2. Sorting the found stores to find the cheapest (for the sum of all products).
  3. Buying the products by updating the `stock`.
)

РЕДАКТИРОВАТЬ 2: приведенная ниже версия process-customers имеет такую ​​же производительность, как параллельная версия process-customers выше. Ниже, очевидно, последовательно.

(defn process-customers [customers]
  "Process `customers` one by one. In this code, this happens sequentially."
  (doseq [customer customers]
    (process-customer customer)))

person Gakuo    schedule 29.04.2019    source источник
comment
Ничего не зная о том, что влечет за собой обработка, каковы ожидаемые характеристики производительности и т. Д., Я не знаю, как можно ожидать, что это будет отвечать. Попробуйте предоставить минимальный воспроизводимый пример -- кратчайший возможный код, который может запустить кто-то другой, чтобы смоделировать проблему и протестировать предлагаемые ими решения.   -  person Charles Duffy    schedule 30.04.2019
comment
(Хорошее решение может, например, состоять в том, чтобы сделать логику транзакции коммутативной; конечно, мы не можем сказать, возможно ли это, если не увидим этого).   -  person Charles Duffy    schedule 30.04.2019
comment
@CharlesDuffy, у меня есть правка. Обратите внимание, что это транзакция, которая просматривает ряд магазинов и получает самый дешевый из магазинов. Если покупатель хочет купить более одного товара, он должен купить все в самом дешевом магазине. Это может быть невозможно сформулировать как коммутативное. Затем запас этого магазина уменьшается. Конечно, нам не нужен отрицательный запас.   -  person Gakuo    schedule 30.04.2019
comment
Кстати, какая конкретная операция делает это медленным и, следовательно, делает важным распараллеливание? Есть ли внешний поиск (база данных или другая операция ввода-вывода)?   -  person Charles Duffy    schedule 30.04.2019
comment
Нет длительного процесса. Кажется, это вопрос, который вы задаете :). Я просто регистрируюсь на экране. Я также могу отключить регистрацию. Я думал, что, поскольку я обрабатываю много клиентов, распараллеливание приведет к повышению производительности. Общий ресурс stock, который должен быть согласованным, кажется узким местом.   -  person Gakuo    schedule 30.04.2019
comment
Вы видели демо с игрой жизни Конвея, реализованной на Clojure с использованием refs/dosync? Мне трудно поверить, что хорошо реализованный алгоритм, в котором всего 10 транзакций пытаются выполняться одновременно, застопорит дело, когда весь экран ячеек может вести себя правильно, когда каждая ячейка действует сама по себе. (Никто не захотел реализовать это таким образом, но это была довольно эффективная демонстрация того, насколько хорошо все это может работать вместе на практике).   -  person Charles Duffy    schedule 30.04.2019
comment
как выглядит ваша основная структура данных?   -  person pete23    schedule 30.04.2019
comment
Если все ваши транзакции занимают примерно одинаковое время, вы можете легко столкнуться с множеством коллизий в системе транзакций. Это приведет к перезапуску большого количества транзакций и, следовательно, к потреблению большого количества времени. Вы можете попробовать просмотреть историю ссылок для отладки. См. код стресс-теста по адресу gist.github.com/Chouser/456326 и (старый, но все же действительно) начало обсуждения clojure-log.n01se.net/ date/2010-06-28.html#16:02 для некоторого понимания.   -  person Stefan Kamphausen    schedule 30.04.2019
comment
Кроме того, улучшится ли ситуация, если удалить partition-all? И мое следующее решение, когда pmap просто недостаточно хорошо, обычно это reducers.   -  person Stefan Kamphausen    schedule 30.04.2019


Ответы (1)


Я предполагаю, что ваша транзакция блокирует инвентарь на весь жизненный цикл process-customer. Это будет медленно, так как все клиенты гонятся за одной и той же вселенной магазинов. Если вы можете разделить процесс на две фазы: 1) цитирование и 2) выполнение и применение транзакции только к (2), тогда производительность должна быть намного лучше. Или, если вы купитесь на agent программирование, у вас будет автоматически определена граница транзакции на уровне сообщения. Вот один образец, который вы можете рассмотреть:

(defn get-best-deal
  "Returns the best deal for a given order with given stores (agent)"
  [stores order]
  ;;
  ;; request for quotation from 1000 stores (in parallel)
  ;;
  (doseq [store stores]
    (send store get-quote order))
  ;;
  ;; wait for reply, up to 0.5s
  ;;
  (apply await-for 500 stores)
  ;;
  ;; sort and find the best store
  ;;
  (when-let [best-store (->> stores
                             (filter (fn [store] (get-in @store [:quotes order])))
                             (sort-by (fn [store] (->> (get-in @store [:quotes order])
                                                       vals
                                                       (reduce +))))
                             first)]
    {:best-store best-store
     :invoice-id (do
                   ;; execute the order
                   (send best-store fulfill order)
                   ;; wait for the transaction to complete
                   (await best-store)
                   ;; get an invoice id
                   (get-in @best-store [:invoices order]))}))

и найти лучшие предложения от 1000 магазинов на 100 заказов (всего 289 позиций) из 100 товаров:

(->> orders
       (pmap (partial get-best-deal stores))
       (filter :invoice-id)
       count
       time)
;; => 57
;; "Elapsed time: 312.002328 msecs"

Пример бизнес-логики:

(defn get-quote
  "issue a quote by checking inventory"
  [store {:keys [order-items] :as order}]
  (if-let [quote (->> order-items
                   (reduce reduce-inventory
                           {:store store
                            :quote nil})
                   :quote)]
    ;; has inventory to generate a quote
    (assoc-in store [:quotes order] quote)
    ;; no inventory
    (update store :quotes dissoc order)))

(defn fulfill
  "fulfill an order if previuosly quoted"
  [store order]
  (if-let [quote (get-in store [:quotes order])]
    ;; check inventory again and generate invoice
    (let [[invoice inventory'] (check-inventory-and-generate-invoice store order)]
      (cond-> store
        invoice (->
                  ;; register invoice
                  (assoc-in [:invoices order] invoice)
                  ;; invalidate the quote
                  (update :quotes dissoc order)
                  ;; update inventory
                  (assoc :inventory inventory'))))
    ;; not quoted before
    store))


person rmcv    schedule 04.05.2019
comment
Весь магазин теперь агент. У меня есть два вопроса. 1. Если потом запустить несколько потоков, их запросы на кавычки и фулфилы не будут выполняться по одному? 2. Если это так, то не является ли это последовательной реализацией? - person Gakuo; 07.05.2019
comment
С точки зрения магазина, единица работы выполняется после получения предложения. Затем он может сразу переключиться на обслуживание другого клиента/заказа. Покупатели могут не торопиться с принятием собственных решений о покупке, не блокируя ни один магазин. Это просто отражает то, как многопоточность/задача работает в реальном сценарии. - person rmcv; 08.05.2019
comment
Кстати, программирование агента — это всего лишь один из способов моделирования решения. Вы можете достичь аналогичной пропускной способности, если к вашему подходу применяется соответствующая степень детализации блокировки. Что-то вроде Row lock (один магазин) vs. Table lock (вселенная магазинов). - person rmcv; 08.05.2019