Дроссельные функции с core.async

Количество возможных исполнений функции должно быть ограничено. Таким образом, после вызова функции любой повторный вызов следует игнорировать в течение определенного периода времени. Если в это время есть вызовы, последний должен быть выполнен после периода времени.

Вот мой подход к core.async. Проблема здесь в том, что дополнительные звонки суммируются в канале c. Мне нужен канал только с одной позицией внутри, которая будет переопределена командой put! каждый раз.

(defn throttle [f time]
  (let [c (chan 1)]
    (go-loop []
      (apply f (<! c))
      (<! (timeout time))
      (recur))
    (fn [& args]
      (put! c (if args args [])))))

Применение:

(def throttled (throttle #(print %) 4000))
(doseq [x (range 10)]
   (throttled x))

; 0
;... after 4 seconds
; 9

Кто-нибудь знает, как это исправить?

Решение

(defn throttle [f time]
  (let [c (chan (sliding-buffer 1))]
    (go-loop []
      (apply f (<! c))
      (<! (timeout time))
      (recur))
    (fn [& args]
      (put! c (or args [])))))

person Anton Harald    schedule 26.02.2016    source источник
comment
github. com/swannodette/swannodette.github.com/blob/   -  person ClojureMostly    schedule 27.02.2016
comment
См. также: aleph.io/codox/manifold/manifold.stream.html #var-дроссель   -  person muhuk    schedule 27.02.2016


Ответы (4)


Чтобы решить вопрос о вашем канале, вы можете использовать чан со скользящим буфером:

user> (require '[clojure.core.async :as async])
nil
user> (def c (async/chan (async/sliding-buffer 1)))
#'user/c
user> (async/>!! c 1)
true
user> (async/>!! c 2)
true
user> (async/>!! c 3)
true
user> (async/<!! c)
3

таким образом, только последнее значение, помещенное в канал, будет вычислено на следующем интервале.

person Arthur Ulfeldt    schedule 26.02.2016
comment
ИМО, это не то, что такое дросселирование, если производитель и потребитель работают быстро, сообщения не отбрасываются. - person ClojureMostly; 27.02.2016
comment
Ответ не полный, но и не неверный. Комбинация некоторого потребителя с тайм-аутом и с использованием sliding-buffer даст полный ответ. - person muhuk; 27.02.2016

Вы можете использовать функцию debounce.

Скопирую сюда:

(defn debounce [in ms]
  (let [out (chan)]
    (go-loop [last-val nil]
      (let [val (if (nil? last-val) (<! in) last-val)
            timer (timeout ms)
            [new-val ch] (alts! [in timer])]
        (condp = ch
          timer (do (>! out val) (recur nil))
          in (recur new-val))))
    out))

Здесь, только когда in не отправил сообщение для ms, последнее значение, которое оно выдало, пересылается на канал out. Пока in продолжает выдавать сообщения без достаточно длинной паузы между выдачами, все сообщения, кроме последнего, постоянно отбрасываются.

Я тестировал эту функцию. Он ждет 4 секунды, а затем выводит 9, что почти то, что вы просили - требуется некоторая настройка!

(defn my-sender [to-chan values]
  (go-loop [[x & xs] values]
           (>! to-chan x)
           (when (seq xs) (recur xs))))

(defn my-receiver [from-chan f]
  (go-loop []
           (let [res (<! from-chan)]
             (f res)
             (recur))))

(defn setup-and-go []
  (let [in (chan)
        ch (debounce in 4000)
        sender (my-sender in (range 10))
        receiver (my-receiver ch #(log %))])) 

И это версия debounce, которая будет выводить в соответствии с требованиями вопроса, то есть 0 сразу, затем подождите четыре секунды, затем 9:

(defn debounce [in ms]
  (let [out (chan)]
    (go-loop [last-val nil
              first-time true]
             (let [val (if (nil? last-val) (<! in) last-val)
                   timer (timeout (if first-time 0 ms))
                   [new-val ch] (alts! [in timer])]
               (condp = ch
                 timer (do (>! out val) (recur nil false))
                 in (recur new-val false))))
    out)) 

Я использовал log, а не print, как вы. Вы не можете полагаться на обычные функции println/print с core.async. См. здесь для объяснения.

person Chris Murphy    schedule 26.02.2016
comment
выглядит более сложной конструкцией, которую я не могу сразу уловить. Разве мой подход не имеет смысла, за исключением уже упомянутого неправильного поведения? Я имею в виду, нельзя ли это как-то легко исправить? - person Anton Harald; 27.02.2016

Это взято у Дэвида Исходный код блога Nolens:

(defn throttle*
  ([in msecs]
    (throttle* in msecs (chan)))
  ([in msecs out]
    (throttle* in msecs out (chan)))
  ([in msecs out control]
    (go
      (loop [state ::init last nil cs [in control]]
        (let [[_ _ sync] cs]
          (let [[v sc] (alts! cs)]
            (condp = sc
              in (condp = state
                   ::init (do (>! out v)
                            (>! out [::throttle v])
                            (recur ::throttling last
                              (conj cs (timeout msecs))))
                   ::throttling (do (>! out v)
                                  (recur state v cs)))
              sync (if last 
                     (do (>! out [::throttle last])
                       (recur state nil
                         (conj (pop cs) (timeout msecs))))
                     (recur ::init last (pop cs)))
              control (recur ::init nil
                        (if (= (count cs) 3)
                          (pop cs)
                          cs)))))))
    out))

(defn throttle-msg? [x]
  (and (vector? x)
       (= (first x) ::throttle)))

(defn throttle
  ([in msecs] (throttle in msecs (chan)))
  ([in msecs out]
    (->> (throttle* in msecs out)
      (filter #(and (vector? %) (= (first %) ::throttle)))
      (map second))))

Возможно, вы также захотите добавить в канал преобразователь dedupe.

person ClojureMostly    schedule 27.02.2016
comment
Спасибо! Второй пример кода, показывающий практическое применение, сделает ваш ответ лучшим :) Также хотелось бы увидеть дедупликацию в действии. - person Petrus Theron; 18.03.2018

Мне нужно было передать функцию для захвата аргументов, потому что я использовал ее для события ввода, и она передавала изменяемый объект. ????

(defn throttle-for-mutable-args [time f arg-capture-fn]
  (let [c (async/chan (async/sliding-buffer 1))]
    (async-m/go-loop []
      (f (async/<! c))
      (async/<! (async/timeout time))
      (recur))
    (fn [& args]
      (async/put! c (apply arg-capture-fn (or args []))))))

И я использую как

[:input
  {:onChange (util/throttle-for-mutable-args                                      
               500
               #(really-use-arg %)                                 
               #(-> % .-target .-value))}]
person Jp_    schedule 23.05.2020