问题描述
我正在使用Akka流创建一个简单的邮件传递服务。该服务就像邮件传递一样,其中来源中的元素包括目的地
和内容
,例如:
I'm creating a simple message delivery service using Akka stream. The service is just like mail delivery, where elements from source include destination
and content
like:
case class Message(destination: String, content: String)
,该服务应根据目的地
字段将邮件传递到适当的接收器。我创建了一个 DeliverySink
类,使其具有名称:
and the service should deliver the messages to appropriate sink based on the destination
field. I created a DeliverySink
class to let it have a name:
case class DeliverySink(name: String, sink: Sink[String, Future[Done]])
现在,我实例化了两个 DeliverySink
,让我将它们称为 sinkX
和 sinkY
,并根据其名称创建了地图。实际上,我想提供一个接收器名称列表,并且该列表应该是可配置的。
Now, I instantiated two DeliverySink
, let me call them sinkX
and sinkY
, and created a map based on their name. In practice, I want to provide a list of sink names and the list should be configurable.
我面临的挑战是如何基于以下内容动态选择合适的接收器目的地
字段。
The challenge I'm facing is how to dynamically choose an appropriate sink based on the destination
field.
最终,我想映射 Flow [Message]
到水槽。我试过:
Eventually, I want to map Flow[Message]
to a sink. I tried:
val sinkNames: List[String] = List("sinkX", "sinkY")
val sinkMapping: Map[String, DeliverySink] =
sinkNames.map { name => name -> DeliverySink(name, ???)}.toMap
Flow[Message].map { msg => msg.content }.to(sinks(msg.destination).sink)
不起作用,因为我们无法在地图外引用 msg
...
but, obviously this doesn't work because we can't reference msg
outside of map...
我想这不是正确的方法。我还考虑过将 filter
与 broadcast
一起使用,但是如果目标扩展到100,则无法键入所有路由。什么是实现我的目标的正确方法?
I guess this is not a right approach. I also thought about using filter
with broadcast
, but if the destination scales to 100, I cannot type every routing. What is a right way to achieve my goal?
Ideally, I would like to make destinations dynamic. So, I cannot statically type all destinations in filter or routing logic. If a destination sink has not been connected, it should create a new sink dynamically too.
推荐答案
如果必须使用多个接收器
将直接满足您的现有要求。如果在每个接收器
之前附加适当的 Flow.filter
,则他们只会收到适当的消息。
Sink.combine
would directly suite your existing requirements. If you attach an appropriate Flow.filter
before each Sink
then they'll only receive the appropriate messages.
不要使用多个接收器
总的来说,我认为设计不好流的结构和内容包含业务逻辑。在普通scala / java代码中,您的流应该是薄薄的单板,用于背压并发的业务逻辑。
In general I think it is bad design to have the structure, and content, of streams contain business logic. Your stream should be a thin veneer for back-pressured concurrency on top of business logic which is in ordinary scala/java code.
在这种特殊情况下,我认为它将最好将目标路由包装在单个接收器中,并且逻辑应在单独的函数中实现。例如:
In this particular case, I think it would be best to wrap your destination routing inside of a single Sink and the logic should be implemented inside of a separate function. For example:
val routeMessage : (Message) => Unit =
(message) =>
if(message.destination equalsIgnoreCase "stdout")
System.out println message.content
else if(message.destination equalsIgnoreCase "stderr")
System.err println message.content
val routeSink : Sink[Message, _] = Sink foreach routeMessage
请注意,现在测试 routeMessage
并不容易,因为它不在流中:我不需要任何Akka测试工具东西来测试routeMessage。如果我的并发设计要更改,我还可以将函数移至 Future
或 Thread
。
Note how much easier it is to now test my routeMessage
since it isn't inside of the stream: I don't need any akka testkit "stuff" to test routeMessage. I can also move the function to a Future
or a Thread
if my concurrency design were to change.
许多目的地
如果目的地很多,可以使用地图
。例如,假设您正在将消息发送到AmazonSQS。您可以定义一个将队列名称转换为队列URL的函数,并使用该函数维护已经创建的名称的映射:
If you have many destinations you can use a Map
. Suppose, for example, you are sending your messages to AmazonSQS. You could define a function to convert a Queue Name to Queue URL and use that function to maintain a Map of already created names:
type QueueName = String
val nameToRequest : (QueueName) => CreateQueueRequest = ??? //implementation unimportant
type QueueURL = String
val nameToURL : (AmazonSQS) => (QueueName) => QueueURL = {
val nameToURL = mutable.Map.empty[QueueName, QueueURL]
(sqs) => (queueName) => nameToURL.get(queueName) match {
case Some(url) => url
case None => {
sqs.createQueue(nameToRequest(queueName))
val url = sqs.getQueueUrl(queueName).getQueueUrl()
nameToURL put (queueName, url)
url
}
}
}
现在您可以在单个Sink内使用此非流函数:
Now you can use this non-stream function inside of a singular Sink:
val sendMessage : (AmazonSQS) => (Message) => Unit =
(sqs) => (message) =>
sqs sendMessage {
(new SendMessageRequest())
.withQueueUrl(nameToURL(sqs)(message.destination))
.withMessageBody(message.content)
}
val sqs : AmazonSQS = ???
val messageSink = Sink foreach sendMessage(sqs)
侧注意
对于目的地
,您可能希望使用 String以外的其他值
。 通常更好,因为它们可以与case语句一起使用,并且您将获得有用的编译器错误,如果您错过其中一种可能性:
For destination
you probably want to use something other than String
. A coproduct is usually better because they can be used with case statements and you'll get helpful compiler errors if you miss one of the possibilities:
sealed trait Destination
object Out extends Destination
object Err extends Destination
object SomethingElse extends Destination
case class Message(destination: Destination, content: String)
//This function won't compile because SomethingElse doesn't have a case
val routeMessage : (Message) => Unit =
(message) => message.destination match {
case Out =>
System.out.println(message.content)
case Err =>
System.err.println(message.content)
}
这篇关于Akka流-根据流中的元素选择接收器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!