(defn fill-queue
"filler-func will be called in another thread with a single arg
'fill'. filler-func may call fill repeatedly with one arg each
time which will be pushed onto a queue, blocking if needed until
this is possible. fill-queue will return a lazy seq of the values
filler-func has pushed onto the queue, blocking if needed until each
next element becomes available. filler-func's return value is ignored."
([filler-func & optseq]
(let [opts (apply array-map optseq)
apoll (:alive-poll opts 1)
q (LinkedBlockingQueue. (:queue-size opts 1))
NIL (Object.) ;nil sentinel since LBQ doesn't support nils
weak-target (Object.)
alive? (WeakReference. weak-target)
fill (fn fill [x]
(if (.get alive?)
(if (.offer q (if (nil? x) NIL x) apoll TimeUnit/SECONDS)
x
(recur x))
(throw (Exception. "abandoned"))))
f (future
(try
(filler-func fill)
(finally
(.put q q))) ;q itself is eos sentinel
nil)] ; set future's value to nil
((fn drain []
weak-target ; force closing over this object
(lazy-seq
(let [x (.take q)]
(if (identical? x q)
@f ;will be nil, touch just to propagate errors
(cons (if (identical? x NIL) nil x)
(drain))))))))))
Used in 0 other vars
Comments top
No comments for fill-queue. Log in to add a comment.