我正在尝试查找有关如何使用TryScan的示例,但尚未找到任何示例,您能帮我吗?

我想做什么(相当简单的示例):我有一个MailboxProcessor接受
两种消息。

  • 第一个GetState返回当前状态。GetState消息发送非常频繁
  • 其他UpdateState非常昂贵(耗时),例如从互联网下载一些内容,然后相应地更新状态。UpdateState很少被调用。

  • 我的问题是-邮件GetState被阻止,等到之前的UpdateState被送达。这就是为什么我尝试使用TryScan处理所有GetState消息,但是没有运气的原因。

    我的示例代码:
    type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
    let mbox = MailboxProcessor.Start(fun mbox ->
                 let rec loop state = async {
                    // this TryScan doesn't work as expected
                    // it should process GetState messages and then continue
                    mbox.TryScan(fun m ->
                        match m with
                        | GetState(chnl) ->
                            printfn "G processing TryScan"
                            chnl.Reply(state)
                            Some(async { return! loop state})
                        | _ -> None
                    ) |> ignore
    
                    let! msg = mbox.Receive()
                    match msg with
                    | UpdateState ->
                        printfn "U processing"
                        // something very time consuming here...
                        async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                        return! loop (state+1)
                    | GetState(chnl) ->
                        printfn "G processing"
                        chnl.Reply(state)
                        return! loop state
                 }
                 loop 0
    )
    
    [async { for i in 1..10 do
              printfn " U"
              mbox.Post(UpdateState)
              async { do! Async.Sleep(200) } |> Async.RunSynchronously
    };
    async { // wait some time so that several `UpdateState` messages are fired
            async { do! Async.Sleep(500) } |> Async.RunSynchronously
            for i in 1..20 do
              printfn "G"
              printfn "%d" (mbox.PostAndReply(GetState))
    }] |> Async.Parallel |> Async.RunSynchronously
    

    如果尝试运行该代码,您将看到GetState消息几乎未处理,因为它等待结果。另一方面,UpdateState只是一劳永逸,从而有效地阻止了状态的获取。

    编辑

    当前适用于我的解决方案是以下解决方案:
    type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
    let mbox = MailboxProcessor.Start(fun mbox ->
                 let rec loop state = async {
                    // this TryScan doesn't work as expected
                    // it should process GetState messages and then continue
                    let! res = mbox.TryScan((function
                        | GetState(chnl) -> Some(async {
                                chnl.Reply(state)
                                return state
                            })
                        | _ -> None
                    ), 5)
    
                    match res with
                    | None ->
                        let! msg = mbox.Receive()
                        match msg with
                            | UpdateState ->
                                async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                                return! loop (state+1)
                            | _ -> return! loop state
                    | Some n -> return! loop n
                 }
                 loop 0
    )
    

    对评论的 react :与其他MailboxProcessor或并行执行ThreadPoolUpdateState的想法很棒,但是我目前不需要它。
    我要做的就是处理所有GetState消息,然后处理其他消息。我不在乎在处理UpdateState期间代理是否被阻止。

    我将向您展示输出中存在的问题:
    // GetState messages are delayed 500 ms - see do! Async.Sleep(500)
    // each UpdateState is sent after 200ms
    // each GetState is sent immediatelly! (not real example, but illustrates the problem)
     U            200ms   <-- issue UpdateState
    U processing          <-- process UpdateState, it takes 1sec, so other
     U            200ms       5 requests are sent; sent means, that it is
     U            200ms       fire-and-forget message - it doesn't wait for any result
                              and therefore it can send every 200ms one UpdateState message
    G                     <-- first GetState sent, but waiting for reply - so all
                              previous UpdateState messages have to be processed! = 3 seconds
                              and AFTER all the UpdateState messages are processed, result
                              is returned and new GetState can be sent.
     U            200ms
     U            200ms       because each UpdateState takes 1 second
     U            200ms
    U processing
     U
     U
     U
     U
    U processing
    G processing          <-- now first GetState is processed! so late? uh..
    U processing          <-- takes 1sec
    3
    G
    U processing          <-- takes 1sec
    U processing          <-- takes 1sec
    U processing          <-- takes 1sec
    U processing          <-- takes 1sec
    U processing          <-- takes 1sec
    U processing          <-- takes 1sec
    G processing          <-- after MANY seconds, second GetState is processed!
    10
    G
    G processing
    // from this line, only GetState are issued and processed, because
    // there is no UpdateState message in the queue, neither it is sent
    

    最佳答案

    在这种情况下,我认为TryScan方法不会对您有所帮助。它允许您指定等待消息时要使用的超时。收到某些消息后,它将开始处理该消息(忽略超时)。

    例如,如果您想等待一些特定的消息,但是每秒(在等待时)执行一些其他检查,则可以编写:

    let loop () = async {
      let! res = mbox.TryScan(function
        | ImportantMessage -> Some(async {
              // process message
              return 0
            })
        | _ -> None)
      match res with
      | None ->
           // perform some check & continue waiting
           return! loop ()
      | Some n ->
           // ImportantMessage was received and processed
    }
    

    在处理UpdateState消息时,如何避免阻塞邮箱处理器?邮箱处理器是(逻辑上)单线程的-您可能不想取消对UpdateState消息的处理,因此最好的选择是在后台开始处理它,然后等待处理完成。然后,处理UpdateState的代码可以将一些消息发送回邮箱(例如UpdateStateCompleted)。

    这是一个草图,看起来可能是这样:
    let rec loop (state) = async {
      let! msg = mbox.Receive()
      match msg with
      | GetState(repl) ->
          repl.Reply(state)
          return! scanning state
      | UpdateState ->
          async {
            // complex calculation (runs in parallel)
            mbox.Post(UpdateStateCompleted newState) }
          |> Async.Start
      | UpdateStateCompleted newState ->
          // Received new state from background workflow
          return! loop newState }
    

    现在,后台任务正在并行运行,您需要注意可变状态。另外,如果您发送UpdateState消息的速度超过了处理它们的速度,那么您将会遇到麻烦。例如,当您已经在处理前一个请求时,可以通过忽略或排队请求来解决此问题。

    关于concurrency - 如何在F#中正确使用TryScan,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/4880171/

    10-13 08:12