本文介绍了async {... AsyncAdd ...}和async {... AsyncAdd ...}有什么区别?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在以下代码中,do! ag.AsyncAdd (Some i)ag.AsyncAdd (Some i)(在功能enqueue()中)均起作用.它们之间有什么区别?看来do! ...会使入队和出队呼叫更加混杂吗?怎么样?

In the following code, both do! ag.AsyncAdd (Some i) or ag.AsyncAdd (Some i) (in the function enqueue()) work. What's the difference between them? It seems do! ... will make enqueuing and dequeuing calls more mixed? How?

open FSharpx.Control

let test () =
    let ag = new BlockingQueueAgent<int option>(500)

    let enqueue() = async { 
        for i = 1 to 15 do 
            // ag.AsyncAdd (Some i) // works too
            do! ag.AsyncAdd (Some i) 
            printfn "=> %d" i }

    async {
        do! [ for i = 1 to 10 do yield enqueue() ] 
            |> Async.Parallel |> Async.Ignore
        for i = 1 to 5 do ag.Add None
    } |> Async.Start

    let rec dequeue() =
        async {
            let! m = ag.AsyncGet()
            match m with
            | Some v ->
                printfn "<= %d" v
                return! dequeue()
            | None -> 
                printfn "Done" 
        }

    [ for i = 1 to 5 do yield dequeue() ] 
    |> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
    0

推荐答案

来自 FSharpx源代码(请参见评论):

    /// Asynchronously adds item to the queue. The operation ends when
    /// there is a place for the item. If the queue is full, the operation
    /// will block until some items are removed.
    member x.AsyncAdd(v:'T, ?timeout) = 
      agent.PostAndAsyncReply((fun ch -> AsyncAdd(v, ch)), ?timeout=timeout)

当不使用do!时,如果队列已满(在构造函数中声明的队列中有500个项目),则不会阻塞排队线程.因此,当您将循环更改为更高的数目时,您将所有入队线程的所有迭代中类型为AsyncAdd的消息(在后台FSharpx使用MailboxProcessor-检查此类的文档)发送到MailboxProcessor队列中.这会减慢另一个操作agent.Scan:

When you do not use do!, you do not block enqueue thread in case if queue is full (500 items in queue as you state in constructor). So, when you changed loops to higher number, you spammed MailboxProcessor queue with messages (behind the scene FSharpx uses MailboxProcessor - check docs for this class) of type AsyncAdd from all iterations of all enqueue thread. This slows down another operation, agent.Scan:

        and fullQueue() = 
            agent.Scan(fun msg ->
              match msg with 
              | AsyncGet(reply) -> Some(dequeueAndContinue(reply))
              | _ -> None )

  • 因为队列中有很多AsyncAdd和AsyncGet.
  • 以防万一,当您放进去!在AsyncAdd之前,当队列中有500个项目时,您的排队线程将被阻塞,并且不会为MailboxProcessor生成其他消息,因此agent.Scan将快速运行.当出队线程接收一个项目并且它们的数量变为499时,新的入队线程将被唤醒并添加新的项目,然后转到循环的下一个迭代,将新的AsyncAdd消息放入MailboxProcessor中,然后再次进入睡眠状态,直到出队时刻为止.因此,MailboxProcessor不会被一个入队线程的所有迭代的消息AsyncAdd所淹没.注意:项目队列和MailboxProcessor消息队列是不同的队列.

    In case, when you put do! before AsyncAdd, you enqueue threads will be blocked at moment when there are 500 items in queue and no additional messages would be generated for MailboxProcessor, thus agent.Scan will work fast. When dequeue thread takes an item and the number of them becomes 499, new enqueue thread awaiks and adds new item and then goes to next iteration of a loop, put new AsyncAdd message into MailboxProcessor and again, goes to sleep till moment of dequeue. Thus, MailboxProcessor is not spammed with messages AsyncAdd of all iterations of one enqueue thread. Note: queue of items and queue of MailboxProcessor messages are different queues.

    这篇关于async {... AsyncAdd ...}和async {... AsyncAdd ...}有什么区别?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-19 01:12