本文介绍了Scala actor 可以同时处理多个消息吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对我最近一个问题的回复表明参与者一次处理一个消息.这是真的?我没有看到任何明确说明的内容(在Scala 编程中),其中包含以下代码段(第 593 页)

The reply to a recent question of mine indicated that an actor processed its messages one at a time. Is this true? I see nothing that explicitly says that (in Programming in Scala), which contains the following snippet (pp. 593)

如果[react 方法] 找到可以处理的消息,[它] 将安排该消息的处理稍后执行并抛出异常>

(强调我自己的).两个相关(且相互排斥)的问题:

(Emphasis my own). Two related (and mutually exclusive) questions:

  1. 假设一个角色可以模拟处理多条消息,我如何强制一个角色一次处理一条消息(如果这是我想要做的)?(使用receive?)
  2. 假设一个actor一次只处理一个消息,我如何最好地实现一个actor,它实际上可以同时处理消息
  1. Assuming an actor could process multiple messages simulatenously, how can I force an actor to process messages 1 at a time (if this is what I wish to do)? (using receive?)
  2. Assuming an actor processes messages one at a time, how would I best implement an actor which in fact could process messages concurrently

做一些测试似乎证明我错了,演员确实是连续的.所以这是我需要回答的问题 #2

推荐答案

Actor 一次处理一条消息.处理多条消息的经典模式是让一个协调器 Actor 前端用于一组消费者 Actor.如果您使用 react,那么消费者池可能会很大,但仍然只会使用少量 JVM 线程.下面是一个示例,其中我创建了一个包含 10 个消费者和一个协调员的池.

Actors process one message at a time. The classic pattern to process multiple messages is to have one coordinator actor front for a pool of consumer actors. If you use react then the consumer pool can be large but will still only use a small number of JVM threads. Here's an example where I create a pool of 10 consumers and one coordinator to front for them.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) =>
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop =>
           consumers foreach {_ ! Stop}
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

此代码测试以查看哪个消费者可用并向该消费者发送请求.替代方案是随机分配给消费者或使用循环调度程序.

This code tests to see which consumer is available and sends a request to that consumer. Alternatives are to just randomly assign to consumers or to use a round robin scheduler.

根据您在做什么,Scala 的 Futures 可能会更好地为您服务.例如,如果你真的不需要演员,那么上述所有机制都可以写成

Depending on what you are doing, you might be better served with Scala's Futures. For instance, if you don't really need actors then all of the above machinery could be written as

import scala.actors.Futures._

def transform(payload : String) = {
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

这篇关于Scala actor 可以同时处理多个消息吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-17 21:44
查看更多