我在学习阿卡。我创建了一个整数源(1到100),并应用了一些流,之后我想将流分成两个接收器:一个接收偶数,另一个接收奇数,但是我不知道该怎么做。有谁能够帮我?非常感谢!看下面的代码:
public class App
{
final ActorSystem system = ActorSystem.create("example");
final Materializer materializer = ActorMaterializer.create(system);
final Source<Integer, NotUsed> source = Source.range(1, 100);
final Flow<Integer, Integer, NotUsed> addFlow = Flow.fromFunction(n -> addFlow(n));
final Flow<Integer, Integer, NotUsed> multipleFlow= Flow.fromFunction(n -> multipleFlow(n));
final Flow<Integer, Integer, NotUsed> mathFlow = addFlow.via(multipleFlow);
final Flow<Integer, Integer, NotUsed> evenFlow = Flow.of(Integer.class).filter(n -> n%2==0);
final Sink<Integer, CompletionStage<Done>> evenSink = Sink.foreach(n -> System.out.println("sink even : " + n));
final Sink<Integer, CompletionStage<Done>> oddSink = Sink.foreach(n -> System.out.println("sink odd : " + n));
public static void main( String[] args )
{
App app = new App();
app.runAkka();
}
private void runAkka() {
RunnableGraph<NotUsed> runnable = source.via(mathFlow).via(evenFlow).to(evenSink);
runnable.run(materializer);
}
private int addFlow(int n) {
return ++n;
}
private int multipleFlow(int n) {
return n * n;
}
}
最佳答案
这不是Java解决方案,而是使用GraphDSL
方法的Scala代码段。以为它可以作为您的Java版本的参考:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.NotUsed
implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
val source: Source[Int, NotUsed] = Source(1 to 20)
def evenFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 0)
def oddFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 1)
val evenSink = Sink.foreach[Int](x => print(x + "e "))
val oddSink = Sink.foreach[Int](x => print(x + "o "))
val graph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Int](2))
source ~> bcast.in
bcast.out(0) ~> evenFlow ~> evenSink
bcast.out(1) ~> oddFlow ~> oddSink
ClosedShape
}
RunnableGraph.fromGraph(graph).run
// res1: akka.NotUsed = NotUsed
// 1o 2e 3o 4e 5o 6e 7o 8e 9o 10e 11o 12e 13o 14e 15o 16e 17o 18e 19o 20e