(defn build-lagging-transducer "creates a transducer that will always run n items behind. this is convenient if the pipeline contains futures, which you want to start deref-ing only when a certain number are in flight" [n] (fn [rf] (let [qv (volatile! PersistentQueue/EMPTY)] (fn ( (rf)) ([acc] (reduce rf acc @qv)) ([acc v] (vswap! qv conj v) (if (< (count @qv) n) acc (let [h (peek @qv)] (vswap! qv pop) (rf acc h))))))))
Multithreading within a transducer
pmap (parallelising map) is a clojure function that parallelises computation of the function against the inputs.
It is very easy to use; just change
pmap and you are done.
pmap does not work in a transducer; Why is this, and can we write an equivalent?
If you call
source pmap you will see that it:
mapto lazily construct
futures of the passed function call
returns a lazy sequence that
derefs these futures
launches the first
(availableProcessors +2)futures immediately (but see next item)
Note that it is subject to chunking. The first 32 futures will automatically be realised, which could result in wasted computation.
You absolutely can do this! There are just two downsides:
You will end up with a bunch of futures in your reducing function, which is not quite the easy-to-use thing offered by
You only indirectly control the parallelisation (by the speed that your reducing function runs)
This will get you the data you need, but will immediately remove any parallelisation
Now you’re talking!
If we make the transducer run on 6 item lag, we can have exactly 6 futures in flight at any time.
All we have to do is to write a bit of code to create a lag. We can recruit
clojure.lang.PersistentQueue to do this for us:
And we can use this to write the transducer generating function:
(defn parallelising-map [f] (let [n (+ 2 (.. Runtime getRuntime availableProcessors))] (comp (map #(fn  (f %))) (map future-call) (build-lagging-transducer n) (map deref))))
Lets define a function that we wish to multithread. Something that goes to sleep randomly:
(defn sleepy-fn [counter v] (let [ts (+ 500 (rand-int 500))] (println (str "starting " v)) (Thread/sleep ts) (println (str "completed " v)) [(swap! counter inc) v ts]))
And now lets try
(let [counter (atom 0)] (into  (parallelising-map #(sleepy-fn counter %)) (range 64)))
Note that this only works efficiently when tasks take similiar lengths of time!
If quick tasks are stuck behind a long task, they will be held up. This is a fundamental consequence of the linear nature of transducers
Well you could write something like:
(let [counter (atom 0) n (+ 2 (.. Runtime getRuntime availableProcessors)) ^ExecutorService exec (Executors/newFixedThreadPool n)] (transduce (comp (map #(fn  (sleepy-fn counter %))) (map #(.submit exec %))) (completing conj #(map deref %)) (range 64)))
But it doesn’t avoid the fundamental problem that if we want the results
sleepy-fn in order, then we are going to have to wait for them.