我对 Akka streams 中的扇出策略有点困惑,我读到了Broadcast –(1 个输入,N 个输出)给定一个输入元素发射到每个输出,而 Balance –(1 个输入,N 个输出)给定一个输入元素发射到其输出端口之一。

你能解释一下吗:

  • 如何平衡多个消费者?
  • 短语“发射到其输出端口之一”的含义
  • 端口是否与下游相同?
  • 'Balance' 是否代表将输入流复制到几个输出分区中
  • “平衡是使图形能够分开并复制下游订阅者的多个实例以处理卷”是什么意思?
  • 最佳答案

    从文档中...广播向每个消费者发出(发送)元素。 balance 只发送给第一个可用的消费者。

    broadcast



    balance



    从评论编辑:

    根据您的要点,您应该创建两个 averageCarrierDelay 函数,每个 ZF 一个。然后你可以看到发送给每个元素的所有元素。

    val averageCarrierDelayZ =
        Flow[FlightDelayRecord]
          .groupBy(30, _.uniqueCarrier)
            .fold(("", 0, 0)){
              (x: (String, Int, Int), y:FlightDelayRecord) => {
                println(s"Z Received Element: ${y}")
                val count = x._2 + 1
                val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
                (y.uniqueCarrier, count, totalMins)
              }
            }.mergeSubstreams
    
    
    val averageCarrierDelayF =
        Flow[FlightDelayRecord]
          .groupBy(30, _.uniqueCarrier)
            .fold(("", 0, 0)){
              (x: (String, Int, Int), y:FlightDelayRecord) => {
                println(s"F Received Element: ${y}")
                val count = x._2 + 1
                val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
                (y.uniqueCarrier, count, totalMins)
              }
            }.mergeSubstreams
    

    编辑 2:为了将来检查事情,我建议为流阶段使用通用记录器,以便您可以看到发生了什么。
    def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
    

    这样做可以让您执行以下操作:
    D ~> logElement[FlightDelayRecord]("F received: ") ~> F
    D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
    

    通过这种方式,您可以检查图形区域是否存在您可能会或可能不会预料到的奇怪行为。

    10-08 02:56
    查看更多