以下代码运行大约需要20秒。但是,在取消注释do!之后花了不到一秒钟的时间。为什么会有如此巨大的差异?

更新:
使用ag.Add时需要9秒钟。我已经更新了代码。

open FSharpx.Control

let test () =
    let ag = new BlockingQueueAgent<int option>(500)

    let enqueue() = async {
        for i = 1 to 500 do
            //do! ag.AsyncAdd (Some i) // less than a second with do!
            ag.AsyncAdd (Some i)       // it takes about 20 seconds without do!
            //ag.Add (Some i)          // This one takes about 9 seconds
            //printfn "=> %d" i
            }

    async {
        do! [ for i = 1 to 100 do yield enqueue() ]
            |> Async.Parallel |> Async.Ignore
        for i = 1 to 5 do ag.Add None
    } |> Async.Start

    let rec dequeue() =
        async {
            let! m = ag.AsyncGet()
            match m with
            | Some v ->
                //printfn "<= %d" v
                return! dequeue()
            | None ->
                printfn "Done"
        }

    [ for i = 1 to 5 do yield dequeue() ]
    |> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
    0

最佳答案

this question继续。这是根据您的代码进行的实验:

// Learn more about F# at http://fsharp.org
module Test.T1

open System
open System.Collections.Generic
open System.Diagnostics

type Msg<'T> =
    | AsyncAdd of 'T * AsyncReplyChannel<unit>
    | Add of 'T
    | AsyncGet of AsyncReplyChannel<'T>

let sw = Stopwatch()
let mutable scanned = 0
let mutable scanTimeStart = 0L
let createQueue maxLength = MailboxProcessor.Start(fun inbox ->
    let queue = new Queue<'T>()
    let rec emptyQueue() =
        inbox.Scan(fun msg ->
          match msg with
          | AsyncAdd(value, reply) -> Some(enqueueAndContinueWithReply(value, reply))
          | Add(value) -> Some(enqueueAndContinue(value))
          | _ -> None )
    and fullQueue() =
        scanTimeStart <- sw.ElapsedMilliseconds
        inbox.Scan(fun msg ->
          scanned <- scanned + 1
          match msg with
          | AsyncGet(reply) ->
            Some(dequeueAndContinue(reply))
          | _ -> None )
    and runningQueue() = async {
        let! msg = inbox.Receive()
        scanTimeStart <- sw.ElapsedMilliseconds
        match msg with
        | AsyncAdd(value, reply) -> return! enqueueAndContinueWithReply(value, reply)
        | Add(value) -> return! enqueueAndContinue(value)
        | AsyncGet(reply) -> return! dequeueAndContinue(reply) }
    and enqueueAndContinueWithReply (value, reply) = async {
        reply.Reply()
        queue.Enqueue(value)
        return! chooseState() }
    and enqueueAndContinue (value) = async {
        queue.Enqueue(value)
        return! chooseState() }
    and dequeueAndContinue (reply) = async {
        let timestamp = sw.ElapsedMilliseconds
        printfn "[AsyncGet] messages cnt/scanned: %d/%d, timestamp/scanTime: %d/%d" inbox.CurrentQueueLength scanned timestamp (timestamp - scanTimeStart)
        scanned <- 0
        reply.Reply(queue.Dequeue())
        return! chooseState() }
    and chooseState() =
        if queue.Count = 0 then emptyQueue()
        elif queue.Count < maxLength then runningQueue()
        else fullQueue()
    emptyQueue())
let mb = createQueue<int option> 500
let addWithReply v = mb.PostAndAsyncReply(fun ch -> AsyncAdd(v, ch))
let addAndForget v = mb.Post(Add v)
let get() = mb.PostAndAsyncReply(AsyncGet)


[<EntryPoint>]
let main args =
    sw.Start()
    let enqueue() = async {
        for i = 1 to 500 do
            //do! ag.AsyncAdd (Some i) // less than a second with do!
            addWithReply (Some i)       // it takes about 20 seconds without do!
            //addAndForget(Some i)
            //ag.Add (Some i)          // This one takes about 9 seconds
            //printfn "=> %d" i
            }

    async {
        do! [ for i = 1 to 100 do yield enqueue() ]
            |> Async.Parallel |> Async.Ignore
        for i = 1 to 5 do addAndForget None
    } |> Async.Start

    let rec dequeue() =
        async {
            let! m = get()
            match m with
            | Some v ->
                //printfn "<= %d" v
                return! dequeue()
            | None ->
                printfn "Done"
        }

    [ for i = 1 to 5 do yield dequeue() ]
    |> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
    sw.Stop()
    printfn "Totally ellapsed: %dms" sw.ElapsedMilliseconds
    0


addWithReply是AsyncAdd。当我们不做就跑!输出是(部分):

...
[AsyncGet] messages cnt/scanned: 48453/48450, timestamp/scanTime: 3755/6
[AsyncGet] messages cnt/scanned: 48452/48449, timestamp/scanTime: 3758/3
[AsyncGet] messages cnt/scanned: 48451/48448, timestamp/scanTime: 3761/3
[AsyncGet] messages cnt/scanned: 48450/48447, timestamp/scanTime: 3764/3
...


如您所见,无需做!您基本上将所有50000个入队请求添加到邮箱的邮件队列中。这里出队线程较慢,仅将其请求放在消息末尾。输出状态的最后一行是邮箱中有48450条消息,项目队列已满(500个项目),为了释放一个空间,我们需要扫描48447条消息-因为它们都是AsyncAdd,而不是AsyncGet。 scanTime是2-3ms(在我的机器上)-来自MailboxProcessor.Scan的大概时间。

当我们添加do!时,消息队列具有不同的形状(请参见输出):

[AsyncGet] messages cnt/scanned: 98/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 96/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 104/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 102/96, timestamp/scanTime: 1561/0


消息队列中的消息数〜入队线程数,因为每个线程现在都在等待。

从实验中我还无法理解,是当您将AsyncAdd更改为Add时,您仍然对MailboxProcessor进行了垃圾邮件处理:

[AsyncGet] messages cnt/scanned: 47551/47548, timestamp/scanTime: 3069/1
[AsyncGet] messages cnt/scanned: 47550/47547, timestamp/scanTime: 3070/1
[AsyncGet] messages cnt/scanned: 47549/47546, timestamp/scanTime: 3073/3
[AsyncGet] messages cnt/scanned: 47548/47545, timestamp/scanTime: 3077/2


但平均花费在扫描上的时间约为1毫秒-比使用AsyncReplyChannel更快。我的想法-这与AsyncReplyChannel的实现方式有关。它依赖于ManualResetEvent,因此在内部每个进程可能会有另一个此类事件的队列,并且每个AsyncGet在创建AsyncReplyChannel时都应扫描此队列。

关于f# - 火与不等待(不做!)与火与等待(不做!)取得了巨大的差异表现?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/57652524/

10-10 13:46