Когда канал будет отброшен, если поток продолжает его использовать?

Рассмотрим следующий фрагмент кода, взятый из пример обхода core.async :

(let [c1 (chan)
      c2 (chan)]
   (thread 
      (while true
         (let [[v ch] (alts!! [c1 c2])]
              (println "Read" v "from" ch))))
   (>!! c1 "hi")
   (>!! c2 "there"))

Я предполагаю, что поток имеет ссылку на оба канала c1 и c2 и в основном будет работать вечно, пытаясь получить значения из любого из них, которые никогда не поступят. Таким образом, ни каналы не будут собирать мусор, ни поток не завершится. Даже если мы явно close! каналов, поток все равно будет продолжаться. Верен ли мой вывод или я что-то упускаю?

Я спрашиваю, потому что пытаюсь найти способ успешно протестировать такой код core.async с таким бесконечно работающим потребителем. Моя текущая попытка выглядит так:

(let [c1 (chan)
      c2 (chan)]
    (go 
        (>!! c1 "hi")
        (>!! c2 "there"))
    (async/thread
      (loop [[v ch] (alts!! [c1 c2])]
        (println "Read" v "from" ch)
        (when-let [[nv nch] (alts!! [c1 c2])]
          (if nv
             (recur [nv nch])
             :done)))))

Это возвращает канал результата (из thread), для которого я хотел бы заблокировать значение :done, но мне нужен способ закрытия (по крайней мере одного из) каналов. Я мог бы вернуть список двух каналов c1, c2 и канал результата, возвращенный thread, а затем close!, например. c1 после этого и проверьте канал результата, но это очень уродливо:

(let [c1 (chan)
      c2 (chan)]
    (go 
        (>!! c1 "hi")
        (>!! c2 "there"))
    [c1 c2 (async/thread
      (loop [[v ch] (alts!! [c1 c2])]
        (println "Read" v "from" ch)
        (when-let [[nv nch] (alts!! [c1 c2])]
          (if nv
              (recur [nv nch])
              :done))))])
=> [#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e> #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@136535df>]
   Read hi from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@60eb5def>
   Read there from #<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@7c64279e>

(let [[c1 c2 resultchan] *1]
  (close! c1)
  (<!! resultchan))
=>:done

В качестве альтернативы я мог бы, вероятно, отправить специальное значение «Конец связи», которое я мог бы затем проверить на принимающей стороне.

Как выглядит наилучшая практика для этого?


person schaueho    schedule 24.06.2015    source источник
comment
блок go предназначен для использования вместе с асинхронными функциями (одиночными!), вы используете их с синхронными функциями (!!), которые все равно должны работать, но это не имеет особого смысла   -  person Viktor K.    schedule 24.06.2015
comment
Разница больше между блокирующей (!!) и парковочной (!) версиями. Внутри блоков go разрешены оба варианта. Кроме того, для проблемы, которую я пытаюсь решить, не имеет значения, использую ли я >!! или >!.   -  person schaueho    schedule 24.06.2015


Ответы (2)


Я не знаю, есть ли такая вещь, как лучшая практика для этого конкретного случая. Вот мое решение проблемы. Я думаю, это довольно просто.

(defn alts-while-open [f & chans]
   (let [a-chans (atom (set chans))]
     (go (while (< 0 (count @a-chans))
           (println "iteration started : " (vec @a-chans))
           (let [[v ch] (alts! (vec @a-chans))]
             (if v
               (f v ch)
               (swap! a-chans #(disj % ch))))))))

Функция f выполняется по результату alts! Пока каналы открыты. Держу здесь атом с множеством открытых каналов. Как только я нахожу закрытый канал, я удаляю его из этого набора. Если открытых каналов больше нет, цикл while останавливается. Вы можете запустить его:

(def c1 (chan))
(def c2 (chan))
(def c3 (chan))
(alts-while-open (fn [v ch] (println v)) c1 c2 c3)

Теперь, когда что-то записывается в любой из этих каналов, оно распечатывается. Вы можете видеть, что после этого начинается новая итерация. Как только вы закроете канал, вы увидите, что итерация началась, но вектор каналов уменьшился. Как только все каналы закрыты, цикл while останавливается.

Трудно ответить на вопрос, использовать ли close! функция или какой-либо другой механизм уведомления, чтобы остановить цикл. Я думаю, это зависит от ситуации. Если бы не сложная обработка «конца связи», я бы просто закрыл! канал. Если есть более сложная логика - например. есть успешные варианты «конец связи» и неудачный «конец связи», и я хочу обрабатывать их по-разному, тогда я бы предпочел отправить специальное сообщение. Что-то вроде

[:end-of-communication :success]
person Viktor K.    schedule 24.06.2015

Идея с отправкой специального end-of-communication не работает, когда вы не отправляете простые значения, потому что вы не можете гарантировать, что значения будут помещаться и приниматься в том порядке, в котором вы их ожидаете.

Следующая идея заключается в том, что отправитель и получатель заранее знают количество значений для обработки, например. как это:

user> (<!!
        (let [c1 (chan)
              values ["hi" "there"]
              vcount (count values)]
           (doseq [value values]
             (thread
                  (>!! c1 value)))
           (thread
               (loop [recvalue (<!! c1)
                      reccount 1]
                  (println "Read" recvalue)
                  (if (= reccount vcount)
                      (do (close! c1)
                          :done)
                      (recur (<!! c1) (inc reccount)))))))
Read hi
Read there
:done

Это работает в принципе, но имеет очевидный недостаток, заключающийся в том, что вы должны согласовать сумму перед настройкой процессов отправки и получения, и менее очевидный недостаток, заключающийся в том, что в случае, если что-то пойдет не так на стороне отправки, вы в конечном итоге будете бесконечно ждать. снова на принимающей стороне (при условии, что отправляющая сторона делает больше, чем просто помещает значения в канал).

Я пришел к выводу, что единственный способ сделать это надежным — использовать канал timeout, например:

(<!! (let [c1 (chan)
           tchan (timeout 1000) 
           values ["hi" "there"]]
       (doseq [value values]
         (thread
           (>!! c1 value)))
       (thread
          (loop [[recvalue rchan] (alts!! [c1 tchan])
                 timeoutchan tchan]
            (if (= rchan timeoutchan)
                (do (close! c1)
                    :done)
                (do (println "Read" recvalue)
                    (let [newtimeout (timeout 1000)]
                        (recur (alts!! [c1 newtimeout])
                               newtimeout))))))

Read hi
Read there
:done
person schaueho    schedule 25.06.2015