这是代码:
(ns typedclj.async
(:require [clojure.core.async
:as a
:refer [>! <! >!! <!!
go chan buffer
close! thread
alts! alts!! timeout]]
[clj-http.client :as -cc]))
(time (dorun
(let [c (chan)]
(doseq [i (range 10 1e4)]
(go (>! c i))))))
我得到一个错误:
Exception in thread "async-dispatch-12" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:150)
at clojure.core.async.impl.ioc_macros$put_BANG_.invoke(ioc_macros.clj:959)
at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817$fn__11819.invoke(async.clj:19)
at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817.invoke(async.clj:19)
at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
at typedclj.async$eval11807$fn__11816.invoke(async.clj:19)
at clojure.lang.AFn.run(AFn.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)...
根据http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/
1e4工作已经太多了吗?那么上限是多少?
最佳答案
我通常不喜欢这样,所以我希望你能原谅我一次违法行为:
在一个更加完美的世界中,每个程序员都会在 sleep 前和睡前第一件事重复五遍“没有无限队列这样的事情”。这种思维方式需要弄清楚系统中将如何处理背压,因此,当过程中某处出现速度下降时,之前的零件便可以找到它并降低响应速度。在core.async中,默认背压是立即的,因为默认缓冲区大小为零。除非有人准备消费,否则go go块不会成功地将某些东西放入chan中。
chan基本上看起来像这样:
"queue of pending puts" --> buffer --> "queue of pending takes"
推杆和接收器队列旨在为通过该管道进行通信的两个进程留出时间来安排自己的时间,从而可以取得进展。没有这些,线程将无法进行调度,并且将发生死锁。它们不打算用作缓冲区。多数民众赞成在中间的缓冲区是为了什么,这是背后的设计,使唯一的一个具有明确的大小。通过在chan中设置缓冲区的大小来显式设置系统的缓冲区大小:
user> (time (dorun
(let [c (chan 1e6)]
(doseq [i (range 10 1e4)]
(go (>! c i))))))
"Elapsed time: 83.526679 msecs"
nil
在这种情况下,我已经“计算”出,如果有多达一百万个等待的工作,我的系统整体将处于良好状态。当然,您的现实世界经验会有所不同,并且对您的情况非常独特。
谢谢你的耐心,