问题描述
我有一个Akka流源
,我想根据一个谓词将其分为两个源。
I have an Akka Streams Source
which I want to split into two sources according to a predicate.
例如有一个源(有意简化了类型):
E.g. having a source (types are simplified intentionally):
val source: Source[Either[Throwable, String], NotUsed] = ???
和两种方法:
def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???
我希望能够拆分来源
根据 _。isRight
谓词,将右侧部分传递给 handleSuccess
方法,将左侧部分传递给 handleFailure
方法。
I would like to be able to split the source
according to _.isRight
predicate and pass the right part to handleSuccess
method and left part to handleFailure
method.
我尝试使用 Broadcast
拆分器,但它需要接收器
结束。
I tried using Broadcast
splitter but it requires Sink
s at the end.
推荐答案
尽管您可以选择您要从中检索项目的 Source
无法创建 Source
并产生两个输出的 Source
就像您最终想要的那样。
Although you can choose which side of the Source
you want to retrieve items from it's not possible to create a Source
that that yields two outputs which is what it seems like you would ultimately want.
给出下面的 GraphStage
基本上将左和右值分成两个输出...
Given the GraphStage
below which essentially splits the left and right values into two outputs...
/**
* Fans out left and right values of an either
* @tparam L left value type
* @tparam R right value type
*/
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
import akka.stream.{Attributes, Outlet}
import akka.stream.stage.GraphStageLogic
override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var out0demand = false
var out1demand = false
setHandler(shape.in, new InHandler {
override def onPush(): Unit = {
if (out0demand && out1demand) {
grab(shape.in) match {
case Left(l) =>
out0demand = false
push(shape.out0, l)
case Right(r) =>
out1demand = false
push(shape.out1, r)
}
}
}
})
setHandler(shape.out0, new OutHandler {
@scala.throws[Exception](classOf[Exception])
override def onPull(): Unit = {
if (!out0demand) {
out0demand = true
}
if (out0demand && out1demand) {
pull(shape.in)
}
}
})
setHandler(shape.out1, new OutHandler {
@scala.throws[Exception](classOf[Exception])
override def onPull(): Unit = {
if (!out1demand) {
out1demand = true
}
if (out0demand && out1demand) {
pull(shape.in)
}
}
})
}
}
..您可以将它们路由到仅接收一侧:
.. you can route them to only receive one side:
val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s =>
import GraphDSL.Implicits._
val eitherFanOut = b.add(new EitherFanOut[Throwable, String])
s ~> eitherFanOut.in
eitherFanOut.out0 ~> Sink.ignore
SourceShape(eitherFanOut.out1)
})
Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)
...或者可能更理想,将它们路由到两个单独的 Sink
s:
... or probably more desirable, route them to two seperate Sink
s:
val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))
val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>
import GraphDSL.Implicits._
val eitherFanOut = b.add(new EitherFanOut[Throwable, String])
s ~> eitherFanOut.in
eitherFanOut.out0 ~> l.in
eitherFanOut.out1 ~> r.in
ClosedShape
})
val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)
(导入和初始设置)
import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, InHandler, OutHandler}
import akka.stream._
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration.Duration
val classLoader = getClass.getClassLoader
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader)
implicit val materializer = ActorMaterializer()
val values: List[Either[Throwable, String]] = List(
Right("B"),
Left(new Throwable),
Left(new RuntimeException),
Right("B"),
Right("C"),
Right("G"),
Right("I"),
Right("F"),
Right("T"),
Right("A")
)
val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)
这篇关于将Akka流源分成两个的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!