以下代码运行大约需要20秒。但是,在取消注释do!
之后花了不到一秒钟的时间。为什么会有如此巨大的差异?
更新:
使用ag.Add
时需要9秒钟。我已经更新了代码。
open FSharpx.Control
let test () =
let ag = new BlockingQueueAgent<int option>(500)
let enqueue() = async {
for i = 1 to 500 do
//do! ag.AsyncAdd (Some i) // less than a second with do!
ag.AsyncAdd (Some i) // it takes about 20 seconds without do!
//ag.Add (Some i) // This one takes about 9 seconds
//printfn "=> %d" i
}
async {
do! [ for i = 1 to 100 do yield enqueue() ]
|> Async.Parallel |> Async.Ignore
for i = 1 to 5 do ag.Add None
} |> Async.Start
let rec dequeue() =
async {
let! m = ag.AsyncGet()
match m with
| Some v ->
//printfn "<= %d" v
return! dequeue()
| None ->
printfn "Done"
}
[ for i = 1 to 5 do yield dequeue() ]
|> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
0
最佳答案
从this question继续。这是根据您的代码进行的实验:
// Learn more about F# at http://fsharp.org
module Test.T1
open System
open System.Collections.Generic
open System.Diagnostics
type Msg<'T> =
| AsyncAdd of 'T * AsyncReplyChannel<unit>
| Add of 'T
| AsyncGet of AsyncReplyChannel<'T>
let sw = Stopwatch()
let mutable scanned = 0
let mutable scanTimeStart = 0L
let createQueue maxLength = MailboxProcessor.Start(fun inbox ->
let queue = new Queue<'T>()
let rec emptyQueue() =
inbox.Scan(fun msg ->
match msg with
| AsyncAdd(value, reply) -> Some(enqueueAndContinueWithReply(value, reply))
| Add(value) -> Some(enqueueAndContinue(value))
| _ -> None )
and fullQueue() =
scanTimeStart <- sw.ElapsedMilliseconds
inbox.Scan(fun msg ->
scanned <- scanned + 1
match msg with
| AsyncGet(reply) ->
Some(dequeueAndContinue(reply))
| _ -> None )
and runningQueue() = async {
let! msg = inbox.Receive()
scanTimeStart <- sw.ElapsedMilliseconds
match msg with
| AsyncAdd(value, reply) -> return! enqueueAndContinueWithReply(value, reply)
| Add(value) -> return! enqueueAndContinue(value)
| AsyncGet(reply) -> return! dequeueAndContinue(reply) }
and enqueueAndContinueWithReply (value, reply) = async {
reply.Reply()
queue.Enqueue(value)
return! chooseState() }
and enqueueAndContinue (value) = async {
queue.Enqueue(value)
return! chooseState() }
and dequeueAndContinue (reply) = async {
let timestamp = sw.ElapsedMilliseconds
printfn "[AsyncGet] messages cnt/scanned: %d/%d, timestamp/scanTime: %d/%d" inbox.CurrentQueueLength scanned timestamp (timestamp - scanTimeStart)
scanned <- 0
reply.Reply(queue.Dequeue())
return! chooseState() }
and chooseState() =
if queue.Count = 0 then emptyQueue()
elif queue.Count < maxLength then runningQueue()
else fullQueue()
emptyQueue())
let mb = createQueue<int option> 500
let addWithReply v = mb.PostAndAsyncReply(fun ch -> AsyncAdd(v, ch))
let addAndForget v = mb.Post(Add v)
let get() = mb.PostAndAsyncReply(AsyncGet)
[<EntryPoint>]
let main args =
sw.Start()
let enqueue() = async {
for i = 1 to 500 do
//do! ag.AsyncAdd (Some i) // less than a second with do!
addWithReply (Some i) // it takes about 20 seconds without do!
//addAndForget(Some i)
//ag.Add (Some i) // This one takes about 9 seconds
//printfn "=> %d" i
}
async {
do! [ for i = 1 to 100 do yield enqueue() ]
|> Async.Parallel |> Async.Ignore
for i = 1 to 5 do addAndForget None
} |> Async.Start
let rec dequeue() =
async {
let! m = get()
match m with
| Some v ->
//printfn "<= %d" v
return! dequeue()
| None ->
printfn "Done"
}
[ for i = 1 to 5 do yield dequeue() ]
|> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
sw.Stop()
printfn "Totally ellapsed: %dms" sw.ElapsedMilliseconds
0
addWithReply是AsyncAdd。当我们不做就跑!输出是(部分):
...
[AsyncGet] messages cnt/scanned: 48453/48450, timestamp/scanTime: 3755/6
[AsyncGet] messages cnt/scanned: 48452/48449, timestamp/scanTime: 3758/3
[AsyncGet] messages cnt/scanned: 48451/48448, timestamp/scanTime: 3761/3
[AsyncGet] messages cnt/scanned: 48450/48447, timestamp/scanTime: 3764/3
...
如您所见,无需做!您基本上将所有50000个入队请求添加到邮箱的邮件队列中。这里出队线程较慢,仅将其请求放在消息末尾。输出状态的最后一行是邮箱中有48450条消息,项目队列已满(500个项目),为了释放一个空间,我们需要扫描48447条消息-因为它们都是AsyncAdd,而不是AsyncGet。 scanTime是2-3ms(在我的机器上)-来自MailboxProcessor.Scan的大概时间。
当我们添加do!时,消息队列具有不同的形状(请参见输出):
[AsyncGet] messages cnt/scanned: 98/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 96/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 104/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 102/96, timestamp/scanTime: 1561/0
消息队列中的消息数〜入队线程数,因为每个线程现在都在等待。
从实验中我还无法理解,是当您将AsyncAdd更改为Add时,您仍然对MailboxProcessor进行了垃圾邮件处理:
[AsyncGet] messages cnt/scanned: 47551/47548, timestamp/scanTime: 3069/1
[AsyncGet] messages cnt/scanned: 47550/47547, timestamp/scanTime: 3070/1
[AsyncGet] messages cnt/scanned: 47549/47546, timestamp/scanTime: 3073/3
[AsyncGet] messages cnt/scanned: 47548/47545, timestamp/scanTime: 3077/2
但平均花费在扫描上的时间约为1毫秒-比使用AsyncReplyChannel更快。我的想法-这与AsyncReplyChannel的实现方式有关。它依赖于ManualResetEvent,因此在内部每个进程可能会有另一个此类事件的队列,并且每个AsyncGet在创建AsyncReplyChannel时都应扫描此队列。
关于f# - 火与不等待(不做!)与火与等待(不做!)取得了巨大的差异表现?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/57652524/