我一直在尝试设计一个系统,该系统允许在内存中同时表示大量并发用户。着手设计这个系统时,我立即想到了某种类似于Actor的基于actor的解决方案。

该系统必须在.NET中完成,因此我开始使用MailboxProcessor在F#中制作原型(prototype),但遇到了严重的性能问题。我最初的想法是为每个用户使用一个参与者(MailboxProcessor)来串行化一个用户的通信。

我隔离了一小段代码,该代码重现了我所看到的问题:

open System.Threading;
open System.Diagnostics;

type Inc() =

    let mutable n = 0;
    let sw = new Stopwatch()

    member x.Start() =
        sw.Start()

    member x.Increment() =
        if Interlocked.Increment(&n) >= 100000 then
            printf "UpdateName Time %A" sw.ElapsedMilliseconds

type Message
    = UpdateName of int * string

type User = {
    Id : int
    Name : string
}

[<EntryPoint>]
let main argv =

    let sw = Stopwatch.StartNew()
    let incr = new Inc()
    let mb =

        Seq.initInfinite(fun id ->
            MailboxProcessor<Message>.Start(fun inbox ->

                let rec loop user =
                    async {
                        let! m = inbox.Receive()

                        match m with
                        | UpdateName(id, newName) ->
                            let user = {user with Name = newName};
                            incr.Increment()
                            do! loop user
                    }

                loop {Id = id; Name = sprintf "User%i" id}
            )
        )
        |> Seq.take 100000
        |> Array.ofSeq

    printf "Create Time %i\n" sw.ElapsedMilliseconds
    incr.Start()

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(UpdateName(i, sprintf "User%i-UpdateName" i));

    System.Console.ReadLine() |> ignore

    0

在我的四核i7上,仅创建100k actor大约需要800毫秒。然后将UpdateName消息提交给每个参与者,等待他们完成大约需要1.8秒。

现在,我意识到所有队列都有开销:在ThreadPool上进行设置,在MailboxProcessor中内部设置/重置AutoResetEvents等。但这真的是预期的性能吗?通过在MailboxProcessor上阅读MSDN和各种博客,我得到了一个想法,它可以成为erlang Actor 的亲戚,但是从糟糕的表现中,我看到这似乎在现实中不成立吗?

我还尝试了该代码的修改版本,该版本使用8个MailboxProcessor,每个代码中都有一个Map<int, User>映射,该映射用于通过id查找用户,它产生了一些改进,将UpdateName操作的总时间缩短到1.2秒。但是感觉仍然很慢,修改后的代码在这里:
open System.Threading;
open System.Diagnostics;

type Inc() =

    let mutable n = 0;
    let sw = new Stopwatch()

    member x.Start() =
        sw.Start()

    member x.Increment() =
        if Interlocked.Increment(&n) >= 100000 then
            printf "UpdateName Time %A" sw.ElapsedMilliseconds

type Message
    = CreateUser of int * string
    | UpdateName of int * string

type User = {
    Id : int
    Name : string
}

[<EntryPoint>]
let main argv =

    let sw = Stopwatch.StartNew()
    let incr = new Inc()
    let mb =

        Seq.initInfinite(fun id ->
            MailboxProcessor<Message>.Start(fun inbox ->

                let rec loop users =
                    async {
                        let! m = inbox.Receive()

                        match m with
                        | CreateUser(id, name) ->
                            do! loop (Map.add id {Id=id; Name=name} users)

                        | UpdateName(id, newName) ->
                            match Map.tryFind id users with
                            | None ->
                                do! loop users

                            | Some(user) ->
                                incr.Increment()
                                do! loop (Map.add id {user with Name = newName} users)
                    }

                loop Map.empty
            )
        )
        |> Seq.take 8
        |> Array.ofSeq

    printf "Create Time %i\n" sw.ElapsedMilliseconds

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(CreateUser(i, sprintf "User%i-UpdateName" i));

    incr.Start()

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(UpdateName(i, sprintf "User%i-UpdateName" i));

    System.Console.ReadLine() |> ignore

    0

所以我的问题在这里,我做错了吗?我是否误解了应该如何使用MailboxProcessor?还是这种性能是预期的。

更新:

因此,我在## fsharp @ irc.freenode.net上找到了一些人,这告诉我使用sprintf的速度非常慢,事实证明这是我大部分性能问题的出处。但是,删除上面的sprintf操作并为每个用户使用相同的名称,我仍然会花费大约400毫秒来控制这些操作,这确实很慢。

最佳答案



以及printfMapSeq和竞争您的全局可变Inc。而且您正在泄漏堆分配的堆栈帧。实际上,运行基准测试所花费的时间中只有一小部分与MailboxProcessor有关。



我对您的程序的性能并不感到惊讶,但是它并没有对MailboxProcessor的性能说太多。


MailboxProcessor在概念上类似于Erlang的一部分。您看到的糟糕表现是由于多种因素造成的,其中有些非常微妙,并且会影响任何此类程序。



我认为您做错了几件事。首先,您要解决的问题尚不清楚,因此听起来像XY problem问题。其次,您正在尝试对错误的事物进行基准测试(例如,您提示创建MailboxProcessor所需的微秒时间,但可能打算仅在建立TCP连接并花费数个数量级以上时才这样做)。第三,您编写了一个基准程序,该程序可以测量某些事物的性能,但是将您的观察结果归因于完全不同的事物。

让我们更详细地查看您的基准程序。在执行其他任何操作之前,让我们修复一些错误。您应该始终使用sw.Elapsed.TotalSeconds来测量时间,因为它更精确。您应该始终在异步工作流程中使用return!而不是do!重现,否则会泄漏堆栈帧。

我的初始时间是:

Creation stage: 0.858s
Post stage: 1.18s

接下来,让我们运行一个配置文件,以确保我们的程序确实将大部分时间都花在了F#MailboxProcessor上:
77%    Microsoft.FSharp.Core.PrintfImpl.gprintf(...)
 4.4%  Microsoft.FSharp.Control.MailboxProcessor`1.Post(!0)

显然不是我们所希望的。更抽象地思考,我们正在使用sprintf之类的东西来生成大量数据,然后将其应用,但是我们正在一起进行生成和应用。让我们分离出初始化代码:
let ids = Array.init 100000 (fun id -> {Id = id; Name = sprintf "User%i" id})
...
    ids
    |> Array.map (fun id ->
        MailboxProcessor<Message>.Start(fun inbox ->
...
            loop id
...
    printf "Create Time %fs\n" sw.Elapsed.TotalSeconds
    let fxs =
      [|for i in 0 .. 99999 ->
          mb.[i % mb.Length].Post, UpdateName(i, sprintf "User%i-UpdateName" i)|]
    incr.Start()
    for f, x in fxs do
      f x
...

现在我们得到:
Creation stage: 0.538s
Post stage: 0.265s

因此创建速度提高了60%,发布速度提高了4.5倍。

让我们尝试完全重写您的基准测试:
do
  for nAgents in [1; 10; 100; 1000; 10000; 100000] do
    let timer = System.Diagnostics.Stopwatch.StartNew()
    use barrier = new System.Threading.Barrier(2)
    let nMsgs = 1000000 / nAgents
    let nAgentsFinished = ref 0
    let makeAgent _ =
      new MailboxProcessor<_>(fun inbox ->
        let rec loop n =
          async { let! () = inbox.Receive()
                  let n = n+1
                  if n=nMsgs then
                    let n = System.Threading.Interlocked.Increment nAgentsFinished
                    if n = nAgents then
                      barrier.SignalAndWait()
                  else
                    return! loop n }
        loop 0)
    let agents = Array.init nAgents makeAgent
    for agent in agents do
      agent.Start()
    printfn "%fs to create %d agents" timer.Elapsed.TotalSeconds nAgents
    timer.Restart()
    for _ in 1..nMsgs do
      for agent in agents do
        agent.Post()
    barrier.SignalAndWait()
    printfn "%fs to post %d msgs" timer.Elapsed.TotalSeconds (nMsgs * nAgents)
    timer.Restart()
    for agent in agents do
      use agent = agent
      ()
    printfn "%fs to dispose of %d agents\n" timer.Elapsed.TotalSeconds nAgents

此版本在每个代理将增加共享计数器之前,期望每个代理使用nMsgs,从而大大降低了该共享计数器对性能的影响。该程序还将检查不同数量的代理的性能。在这台机器上,我得到:
Agents  M msgs/s
     1    2.24
    10    6.67
   100    7.58
  1000    5.15
 10000    1.15
100000    0.36

因此看来,您看到较低的msgs/s速度的部分原因是数量异常庞大(100,000)的代理。使用10-1,000个代理时,F#实现的速度是使用100,000个代理时的10倍以上。

因此,如果可以利用这种性能,那么您应该可以用F#编写整个应用程序,但是如果您需要更多的性能,我建议您使用其他方法。通过采用Disruptor之类的设计,您甚至不必牺牲使用F#(并且您当然可以将其用于原型(prototype)设计)。在实践中,我发现在.NET上进行序列化的时间往往比在F#async和MailboxProcessor中花费的时间大得多。

10-08 14:31