ClojureDocs

Nav

Namespaces

pub

  • (pub ch topic-fn)
  • (pub ch topic-fn buf-fn)
Creates and returns a pub(lication) of the supplied channel,
partitioned into topics by the topic-fn. topic-fn will be applied to
each value on the channel and the result will determine the 'topic'
on which that value will be put. Channels can be subscribed to
receive copies of topics using 'sub', and unsubscribed using
'unsub'. Each topic will be handled by an internal mult on a
dedicated channel. By default these internal channels are
unbuffered, but a buf-fn can be supplied which, given a topic,
creates a buffer with desired properties.
 Each item is distributed to all subs in parallel and synchronously,
i.e. each sub must accept before the next item is distributed. Use
buffering/windowing to prevent slow subs from holding up the pub.
 Items received when there are no matching subs get dropped.
 Note that if buf-fns are used then each topic is handled
asynchronously, i.e. if a channel is subscribed to more than one
topic it should not expect them to be interleaved identically with
the source.
5 Examples
user=> (def c (chan 1))
#'user/c

user=> (def sub-c (pub c :route))
#'user/sub-c

user=> (def cx (chan 1))
#'user/cx

user=> (sub sub-c :up-stream cx)
#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@526eb67f>

user=> (def cy (chan 1))
#'user/cy

user=> (sub sub-c :down-stream cy)
#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@777692ac>

user=> (go-loop [_ (<! cx)]
         (println "Got something coming up!"))
#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@49e2e0f9>

user=> (go-loop [_ (<! cy)]
         (println "Got something going down!"))
#<ManyToManyChannel clojure.core.async.impl.channels.ManyToManyChannel@33498147>

user=> (put! c {:route :up-stream :data 123})
true
Got something coming up!

user=> (put! c {:route :down-stream :data 123})
Got something going down!
true
(def news (chan 1))
(def shouter (pub news :topics))

(def alice (chan 1))
(def bob (chan 1))
(def clyde (chan 1))

(sub shouter :celebrity-gossip alice)
(sub shouter :space-x bob)
(sub shouter :space-x clyde)

(go-loop [heard (<! alice)] (println "alice heard: " heard))
(go-loop [heard (<! bob)] (println "bob heard: " heard))
(go-loop [heard (<! clyde)] (println "clyde heard: " heard))

(put! news {:topics :celebrity-gossip :data "omg she's prego!"})
(put! news {:topics :space-x :data "omg we're landing!"})

; notice: they both "heard" the space-x news
; Also, since I'm a bit new to `async`, I'm not sure on this:
; There seems to be some issues with bob only hearing when clyde is listening.
; not sure what's up.
(defn monitor! [publisher topic]
  (let [<listen (chan 1)]
    (async/sub publisher topic <listen)
    (go-loop []
      (when-let [{:keys [message]} (<! <listen)]
        (println topic \- message)
        (recur)))))

(defn start []
  (let [<publish (chan 1)
        publisher (async/pub <publish :source)
        ]
    (monitor! publisher :alice)
    (monitor! publisher :bob)
    (put! <publish {:source :alice :message "Alice says Hi!"})
    (put! <publish {:source :bob :message "Bob says Hi!"})
    (put! <publish {:source :john :message "John says Hi!"})
    (put! <publish {:source :alice :message "Alice says Hi again"})
    (close! <publish)
    ))

(start)
=> nil
:alice - Alice says Hi!
:bob - Bob says Hi!
:alice - Alice says Hi again
(defn pub-test
  "Check how one topic subscription can block others."
  [bufsize]
  (let [out (chan)
        out-pub (if (zero? bufsize)
                  (async/pub out first)
                  (async/pub out first (fn [topic] (async/buffer bufsize))))
        blocking-sub (chan)
        blocked-sub (chan)]
    (async/sub out-pub :blocking blocking-sub)
    (async/sub out-pub :blocked blocked-sub)
    ; no sub so doesn't block pub:
    (doseq [n (range 5)] (async/put! out [:ignored n]))
    ; blocking-sub blocks blocked-sub until consumed or buffered:
    (doseq [n (range 5)] (async/put! out [:blocking n]))
    (doseq [n (range 5)] (async/put! out [:blocked n]))
    ; don't consume last three put!s on blocking-sub:
    (dotimes [n 2] (async/take! blocking-sub tap>))
    ; close!ing blocking-sub here unblocks blocked-sub
    ; (async/close! blocking-sub)
    (dotimes [n 5] (async/take! blocked-sub tap>))))

(pub-test 0) ; blocks because 5 pub - 0 buffer - 1 on inner chan - 2 take > 0 left
[:blocking 0]
[:blocking 1]
=> nil
(pub-test 1) ; blocks because 5 - 1 - 1 - 2 > 0
[:blocking 0]
[:blocking 1]
=> nil
(pub-test 2) ; doesn't block because 5 - 2 - 1 - 2 = 0
[:blocking 0]
[:blocking 1]
[:blocked 0]
[:blocked 1]
[:blocked 2]
[:blocked 3]
[:blocked 4]
=> nil
(def news (chan 1))

(def shouter (pub news :topics))

(def alice (chan 1))
(def bob (chan 1))
(def clyde (chan 1))

(sub shouter :gossip alice)
(sub shouter :space bob)
(sub shouter :space clyde)

(go-loop []
  (prn "Alice heard: " (<! alice))
  (recur))
(go-loop []
  (prn "Bob heard: " (<! bob))
  (recur))
(go-loop []
  (prn "Clyde heard: " (<! clyde))
  (recur))

(put! news {:topics :gossip :data "she prego"})

(put! news {:topics :space :data "astroid!"})
See Also

Subscribes a channel to a topic of a pub. By default the channel will be closed when the source c...

Added by jw-00000
0 Notes
No notes for pub