我在学习阿卡。我创建了一个整数源(1到100),并应用了一些流,之后我想将流分成两个接收器:一个接收偶数,另一个接收奇数,但是我不知道该怎么做。有谁能够帮我?非常感谢!看下面的代码:

public class App
{
    final ActorSystem system = ActorSystem.create("example");
    final Materializer materializer = ActorMaterializer.create(system);

    final Source<Integer, NotUsed> source = Source.range(1, 100);

    final Flow<Integer, Integer, NotUsed> addFlow = Flow.fromFunction(n -> addFlow(n));
    final Flow<Integer, Integer, NotUsed> multipleFlow= Flow.fromFunction(n -> multipleFlow(n));
    final Flow<Integer, Integer, NotUsed> mathFlow = addFlow.via(multipleFlow);

    final Flow<Integer, Integer, NotUsed> evenFlow = Flow.of(Integer.class).filter(n -> n%2==0);

    final Sink<Integer, CompletionStage<Done>> evenSink =  Sink.foreach(n -> System.out.println("sink even : " + n));
    final Sink<Integer, CompletionStage<Done>> oddSink =  Sink.foreach(n -> System.out.println("sink odd : " + n));

    public static void main( String[] args )
    {
        App app = new App();
        app.runAkka();
    }

    private void runAkka() {
        RunnableGraph<NotUsed> runnable = source.via(mathFlow).via(evenFlow).to(evenSink);
        runnable.run(materializer);
    }

    private int addFlow(int n) {
        return ++n;
    }

    private int multipleFlow(int n) {
        return n * n;
    }
}

最佳答案

这不是Java解决方案,而是使用GraphDSL方法的Scala代码段。以为它可以作为您的Java版本的参考:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.NotUsed

implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()

val source: Source[Int, NotUsed] = Source(1 to 20)

def evenFlow: Flow[Int, Int, NotUsed] =  Flow[Int].filter(_ % 2 == 0)
def oddFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 1)

val evenSink = Sink.foreach[Int](x => print(x + "e "))
val oddSink = Sink.foreach[Int](x => print(x + "o "))

val graph = GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[Int](2))

  source ~> bcast.in
  bcast.out(0) ~> evenFlow ~> evenSink
  bcast.out(1) ~> oddFlow ~> oddSink

  ClosedShape
}

RunnableGraph.fromGraph(graph).run
// res1: akka.NotUsed = NotUsed
// 1o 2e 3o 4e 5o 6e 7o 8e 9o 10e 11o 12e 13o 14e 15o 16e 17o 18e 19o 20e

07-27 23:59