(defn seque
"Creates a queued seq on another (presumably lazy) seq s. The queued
seq will produce a concrete seq in the background, and can get up to
n items ahead of the consumer. n-or-q can be an integer n buffer
size, or an instance of java.util.concurrent BlockingQueue. Note
that reading from a seque can block if the reader gets ahead of the
producer."
{:added "1.0"
:static true}
([s] (seque 100 s))
([n-or-q s]
(let [^BlockingQueue q (if (instance? BlockingQueue n-or-q)
n-or-q
(LinkedBlockingQueue. (int n-or-q)))
NIL (Object.) ;nil sentinel since LBQ doesn't support nils
agt (agent (seq s))
fill (fn [s]
(try
(loop [[x & xs :as s] s]
(if s
(if (.offer q (if (nil? x) NIL x))
(recur xs)
s)
(.put q q))) ; q itself is eos sentinel
(catch Exception e
(.put q q)
(throw e))))
drain (fn drain []
(lazy-seq
(let [x (.take q)]
(if (identical? x q) ;q itself is eos sentinel
(do @agt nil) ;touch agent just to propagate errors
(do
(send-off agt fill)
(cons (if (identical? x NIL) nil x) (drain)))))))]
(send-off agt fill)
(drain))))
Vars in clojure.core/seque:
Used in 0 other vars
Comments top
No comments for seque. Log in to add a comment.