是否可以将 Outlet[A] 提升为 FlowOps[A, _] ?那就是如果我有这个:

import akka.NotUsed
import akka.stream.Outlet
import akka.stream.scaladsl.{FlowOps, GraphDSL, Source}

def filter(in: Outlet[Double])
          (implicit b: GraphDSL.Builder[NotUsed]): Outlet[Double] = {
  val in0: FlowOps[Double, NotUsed] = ???
  val res = in0.grouped(8).statefulMapConcat[Double] { () =>
    seq => seq.reverse
  }
  res
  ??? : Outlet[Double]
}

要使 grouped 调用像 inSourceFlow 一样工作?

最佳答案

这只是缺少隐式的导入,然后 Outlet 可用于流操作:

def filter(in: Outlet[Double])
            (implicit b: GraphDSL.Builder[NotUsed]): Outlet[Double] = {
    import GraphDSL.Implicits._
    import scala.collection.immutable.{Seq => ISeq}
    val grouped: PortOps[ISeq[Double]] = in.grouped(8)
    val flattened: PortOps[Double] = grouped.statefulMapConcat[Double] { () =>
      seq => seq.reverse
    }
    flattened.outlet
  }

关于scala - Akka 流 : Outlet to FlowOps,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37199126/

10-11 10:49