open FSharpx.Control
let test () =
let ag = new BlockingQueueAgent<int option>(500)
let enqueue() = async {
for i = 1 to 500 do
//do! ag.AsyncAdd (Some i) // less than a second with do!
ag.AsyncAdd (Some i) // it takes about 20 seconds without do!
//ag.Add (Some i) // This one takes about 9 seconds
//printfn "=> %d" i
async {
do! [ for i = 1 to 100 do yield enqueue() ]
|> Async.Parallel |> Async.Ignore
for i = 1 to 5 do ag.Add None
} |> Async.Start
let rec dequeue() =
async {
let! m = ag.AsyncGet()
match m with
| Some v ->
//printfn "<= %d" v
return! dequeue()
| None ->
printfn "Done"
[ for i = 1 to 5 do yield dequeue() ]
|> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
从this question继续。这是根据您的代码进行的实验:
module Test.T1
open System
open System.Collections.Generic
open System.Diagnostics
type Msg<'T> =
| AsyncAdd of 'T * AsyncReplyChannel<unit>
| Add of 'T
| AsyncGet of AsyncReplyChannel<'T>
let sw = Stopwatch()
let mutable scanned = 0
let mutable scanTimeStart = 0L
let createQueue maxLength = MailboxProcessor.Start(fun inbox ->
let queue = new Queue<'T>()
let rec emptyQueue() =
inbox.Scan(fun msg ->
match msg with
| AsyncAdd(value, reply) -> Some(enqueueAndContinueWithReply(value, reply))
| Add(value) -> Some(enqueueAndContinue(value))
| _ -> None )
and fullQueue() =
scanTimeStart <- sw.ElapsedMilliseconds
inbox.Scan(fun msg ->
scanned <- scanned + 1
match msg with
| AsyncGet(reply) ->
| _ -> None )
and runningQueue() = async {
let! msg = inbox.Receive()
scanTimeStart <- sw.ElapsedMilliseconds
match msg with
| AsyncAdd(value, reply) -> return! enqueueAndContinueWithReply(value, reply)
| Add(value) -> return! enqueueAndContinue(value)
| AsyncGet(reply) -> return! dequeueAndContinue(reply) }
and enqueueAndContinueWithReply (value, reply) = async {
return! chooseState() }
and enqueueAndContinue (value) = async {
return! chooseState() }
and dequeueAndContinue (reply) = async {
let timestamp = sw.ElapsedMilliseconds
printfn "[AsyncGet] messages cnt/scanned: %d/%d, timestamp/scanTime: %d/%d" inbox.CurrentQueueLength scanned timestamp (timestamp - scanTimeStart)
scanned <- 0
return! chooseState() }
and chooseState() =
if queue.Count = 0 then emptyQueue()
elif queue.Count < maxLength then runningQueue()
else fullQueue()
let mb = createQueue<int option> 500
let addWithReply v = mb.PostAndAsyncReply(fun ch -> AsyncAdd(v, ch))
let addAndForget v = mb.Post(Add v)
let get() = mb.PostAndAsyncReply(AsyncGet)
let main args =
let enqueue() = async {
for i = 1 to 500 do
//do! ag.AsyncAdd (Some i) // less than a second with do!
addWithReply (Some i) // it takes about 20 seconds without do!
//addAndForget(Some i)
//ag.Add (Some i) // This one takes about 9 seconds
//printfn "=> %d" i
async {
do! [ for i = 1 to 100 do yield enqueue() ]
|> Async.Parallel |> Async.Ignore
for i = 1 to 5 do addAndForget None
} |> Async.Start
let rec dequeue() =
async {
let! m = get()
match m with
| Some v ->
//printfn "<= %d" v
return! dequeue()
| None ->
printfn "Done"
[ for i = 1 to 5 do yield dequeue() ]
|> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
printfn "Totally ellapsed: %dms" sw.ElapsedMilliseconds
[AsyncGet] messages cnt/scanned: 48453/48450, timestamp/scanTime: 3755/6
[AsyncGet] messages cnt/scanned: 48452/48449, timestamp/scanTime: 3758/3
[AsyncGet] messages cnt/scanned: 48451/48448, timestamp/scanTime: 3761/3
[AsyncGet] messages cnt/scanned: 48450/48447, timestamp/scanTime: 3764/3
如您所见,无需做!您基本上将所有50000个入队请求添加到邮箱的邮件队列中。这里出队线程较慢,仅将其请求放在消息末尾。输出状态的最后一行是邮箱中有48450条消息,项目队列已满(500个项目),为了释放一个空间,我们需要扫描48447条消息-因为它们都是AsyncAdd,而不是AsyncGet。 scanTime是2-3ms(在我的机器上)-来自MailboxProcessor.Scan的大概时间。
[AsyncGet] messages cnt/scanned: 98/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 96/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 104/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 102/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 47551/47548, timestamp/scanTime: 3069/1
[AsyncGet] messages cnt/scanned: 47550/47547, timestamp/scanTime: 3070/1
[AsyncGet] messages cnt/scanned: 47549/47546, timestamp/scanTime: 3073/3
[AsyncGet] messages cnt/scanned: 47548/47545, timestamp/scanTime: 3077/2
