是否可以将 Outlet[A]
提升为 FlowOps[A, _]
?那就是如果我有这个:
import akka.NotUsed
import akka.stream.Outlet
import akka.stream.scaladsl.{FlowOps, GraphDSL, Source}
def filter(in: Outlet[Double])
(implicit b: GraphDSL.Builder[NotUsed]): Outlet[Double] = {
val in0: FlowOps[Double, NotUsed] = ???
val res = in0.grouped(8).statefulMapConcat[Double] { () =>
seq => seq.reverse
}
res
??? : Outlet[Double]
}
要使
grouped
调用像 in
是 Source
或 Flow
一样工作? 最佳答案
这只是缺少隐式的导入,然后 Outlet
可用于流操作:
def filter(in: Outlet[Double])
(implicit b: GraphDSL.Builder[NotUsed]): Outlet[Double] = {
import GraphDSL.Implicits._
import scala.collection.immutable.{Seq => ISeq}
val grouped: PortOps[ISeq[Double]] = in.grouped(8)
val flattened: PortOps[Double] = grouped.statefulMapConcat[Double] { () =>
seq => seq.reverse
}
flattened.outlet
}
关于scala - Akka 流 : Outlet to FlowOps,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37199126/