将Akka流源分成两个

将Akka流源分成两个

本文介绍了将Akka流源分成两个的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Akka流,我想根据一个谓词将其分为两个源。

I have an Akka Streams Source which I want to split into two sources according to a predicate.

例如有一个源(有意简化了类型):

E.g. having a source (types are simplified intentionally):

val source: Source[Either[Throwable, String], NotUsed] = ???

和两种方法:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

我希望能够拆分来源根据 _。isRight 谓词,将右侧部分传递给 handleSuccess 方法,将左侧部分传递给 handleFailure 方法。

I would like to be able to split the source according to _.isRight predicate and pass the right part to handleSuccess method and left part to handleFailure method.

我尝试使用 Broadcast 拆分器,但它需要接收器结束。

I tried using Broadcast splitter but it requires Sinks at the end.

推荐答案

尽管您可以选择您要从中检索项目的 Source 无法创建 Source 并产生两个输出的 Source 就像您最终想要的那样。

Although you can choose which side of the Source you want to retrieve items from it's not possible to create a Source that that yields two outputs which is what it seems like you would ultimately want.

给出下面的 GraphStage 基本上将左和右值分成两个输出...

Given the GraphStage below which essentially splits the left and right values into two outputs...

/**
  * Fans out left and right values of an either
  * @tparam L left value type
  * @tparam R right value type
  */
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
  import akka.stream.{Attributes, Outlet}
  import akka.stream.stage.GraphStageLogic

  override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    var out0demand = false
    var out1demand = false

    setHandler(shape.in, new InHandler {
      override def onPush(): Unit = {

        if (out0demand && out1demand) {
          grab(shape.in) match {
            case Left(l) =>
              out0demand = false
              push(shape.out0, l)
            case Right(r) =>
              out1demand = false
              push(shape.out1, r)
          }
        }
      }
    })

    setHandler(shape.out0, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out0demand) {
          out0demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })

    setHandler(shape.out1, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out1demand) {
          out1demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })
  }
}

..您可以将它们路由到仅接收一侧:

.. you can route them to only receive one side:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s =>
  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> Sink.ignore

  SourceShape(eitherFanOut.out1)
})

Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)

...或者可能更理想,将它们路由到两个单独的 Sink s:

... or probably more desirable, route them to two seperate Sinks:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> l.in
  eitherFanOut.out1 ~> r.in

  ClosedShape
})


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

(导入和初始设置)

import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, InHandler, OutHandler}
import akka.stream._
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val classLoader = getClass.getClassLoader
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader)
implicit val materializer = ActorMaterializer()

val values: List[Either[Throwable, String]] = List(
  Right("B"),
  Left(new Throwable),
  Left(new RuntimeException),
  Right("B"),
  Right("C"),
  Right("G"),
  Right("I"),
  Right("F"),
  Right("T"),
  Right("A")
)

val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)

这篇关于将Akka流源分成两个的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-13 18:02