对于需要处理的两个非常大的数据集,我有两个不同的功能,最后归结为两个 bool 值。然后需要将这些值加在一起以得到最终结果。我的问题是创建线程以便两个长函数可以同时运行的最佳方法是什么。我的想法就像
(def f (future longProcessOne(data_one)))
(def g (future longProcessTwo(data_two)))
(and @f @g)
但我一直在寻找更好的方法来解决此问题。
最佳答案
(顶部的基于 promise 的方法,而基于core.async的方法则降低了。两者都在第一个虚假值上短路。)
这是一个利用单个 promise 可以多次交付的事实的版本(尽管只有第一次交付才能成功设置其值;后续交付只是返回nil
而没有副作用)。
(defn thread-and
"Computes logical conjunction of return values of fs, each of which
is called in a future. Short-circuits (cancelling the remaining
futures) on first falsey value."
[& fs]
(let [done (promise)
ret (atom true)
fps (promise)]
(deliver fps (doall (for [f fs]
(let [p (promise)]
[(future
(if-not (swap! ret #(and %1 %2) (f))
(deliver done true))
(locking fps
(deliver p true)
(when (every? realized? (map peek @fps))
(deliver done true))))
p]))))
@done
(doseq [[fut] @fps]
(future-cancel fut))
@ret))
一些测试:
(thread-and (constantly true) (constantly true))
;;= true
(thread-and (constantly true) (constantly false))
;;= false
(every? false?
(repeatedly 100000
#(thread-and (constantly true) (constantly false))))
;;= true
;; prints :foo, but not :bar
(thread-and #(do (Thread/sleep 1000) (println :foo))
#(do (Thread/sleep 3000) (println :bar)))
将亚瑟(Arthur)和A.韦伯(A. Webb)的想法放在一起,可以将core.async和和结果一起使用,同时缩短返回的第一个false值:
(defn thread-and
"Call each of the fs on a separate thread. Return logical
conjunction of the results. Short-circuit (and cancel the calls
to remaining fs) on first falsey value returned."
[& fs]
(let [futs-and-cs
(doall (for [f fs]
(let [c (chan)]
[(future (>!! c (f))) c])))]
(loop [futs-and-cs futs-and-cs]
(if (seq futs-and-cs)
(let [[result c] (alts!! (map peek futs-and-cs))]
(if result
(recur (remove #(identical? (peek %) c)
futs-and-cs))
(do (doseq [fut (map first futs-and-cs)]
(future-cancel fut))
false)))
true))))
用
(constantly false)
和(constantly true)
测试:(thread-and (constantly true) (constantly true))
;= true
(thread-and (constantly true) (constantly false))
;= false
;;; etc.
另请注意,短路确实有效:
;;; prints :foo before returning false
(thread-and #(do (Thread/sleep 3000) false)
#(do (Thread/sleep 1000) (println :foo)))
;;; does not print :foo
(thread-and #(do (Thread/sleep 3000) false)
#(do (Thread/sleep 7000) (println :foo)))