我对 Akka streams
中的扇出策略有点困惑,我读到了Broadcast
–(1 个输入,N 个输出)给定一个输入元素发射到每个输出,而 Balance
–(1 个输入,N 个输出)给定一个输入元素发射到其输出端口之一。
你能解释一下吗:
最佳答案
从文档中...广播向每个消费者发出(发送)元素。 balance 只发送给第一个可用的消费者。
broadcast
balance
从评论编辑:
根据您的要点,您应该创建两个 averageCarrierDelay 函数,每个 Z
和 F
一个。然后你可以看到发送给每个元素的所有元素。
val averageCarrierDelayZ =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"Z Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
val averageCarrierDelayF =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"F Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
编辑 2:为了将来检查事情,我建议为流阶段使用通用记录器,以便您可以看到发生了什么。
def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
这样做可以让您执行以下操作:
D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
通过这种方式,您可以检查图形区域是否存在您可能会或可能不会预料到的奇怪行为。