core.async переподключение веб-сокета

core.async нуб здесь, но пытаюсь учиться, создавая автоматически переподключающийся веб-сокет. Идея состоит в том, что сокет абстрагируется, так что любому, кто его использует, не нужно беспокоиться о том, подключен он или нет, и он может просто помещать сообщения в канал. Если сокет подключен, сообщения сразу считываются из канала и отправляются. Если сокет в настоящее время не подключен, он неоднократно пытается переподключиться и ожидает отправки всех ожидающих сообщений после установления соединения.

Это то, что у меня есть до сих пор:

(ns frontend.socket
  (:require-macros [cljs.core.async.macros :as asyncm :refer [go]])
  (:require [cljs.core.async :as async :refer [<! >! chan timeout]]))

(def endpoint "ws://localhost:8080")
(def socket (atom nil))
(def open (chan))
(def in (chan))
(def out (chan))
(def error (chan))
(def close (chan))

(defn open? []
  (= (.-readyState @socket) (.-OPEN @socket)))

(defn event>chan
  "Given a core.async channel, returns a
  function which takes an event and feeds
  it into the formerly given channel."
  [channel]
  (fn [e]
    (go (>! channel e))))

(defn connect
  "Opens a websocket, stores it in the socket
  atom, and feeds the socket's events into
  the corresponding channels."
  []
  (let [s (js/WebSocket. endpoint)]
    (reset! socket s)
    (set! s.onopen    (event>chan open))
    (set! s.onmessage (event>chan in))
    (set! s.onerror   (event>chan error))
    (set! s.onclose   (event>chan close))))

(defn init
  "This is the entry point from outside this
  namespace. Eagerly connects and then, if
  ever disconnected, attempts to reconnect
  every second. Also takes messages from
  the out channel and sends them through
  the socket."
  []
  (go
    (while true
      (connect)
      (<! close)
      (<! (timeout 1000))))
  (go
    (while true
      (let [m (<! out)]
        (when (or (open?) (<! open))
          (.send @socket m))))))

Логика повторного подключения работает нормально, но я столкнулся с проблемой, пытаясь отправить сообщения после закрытия сокета. В последних нескольких строках я проверяю, что перед отправкой сообщения сокет открыт, или жду его открытия. Проблема в том, что если сообщение не было отправлено, пока сокет был ранее открыт, затем закрыт, а затем отправлено сообщение, на открытом канале все еще что-то находится, и поэтому сообщение будет отправлено независимо. Так что открытый канал может как бы «устареть».

Я думал брать из открытого канала всякий раз, когда сокет закрывается, но это просто решает проблему: тогда открытый канал может пропустить значения, когда они мне нужны, и сообщения не отправляются при повторном подключении.

Что я здесь делаю неправильно?


person weltschmerz    schedule 15.12.2019    source источник


Ответы (1)


Это был неправильный подход, потому что каналы будут ждать, чтобы их использовали. Вместо этого мне нужно было использовать pub/sub: https://github.com/clojure/core.async/wiki/Pub-Sub/549da1843c7bbe6009e9904eed49f91000d8ce2c

person weltschmerz    schedule 23.12.2019