我知道在 Akka Stream 中,一个导出必须连接到一个入口,并且没有自动支持将多个接收器连接到同一个源。所以你必须插入中间对象,比如 Broadcast

我正在将信号处理 DAG 转换为 Akka Stream 图,如果我可以在遍历时动态地将接收器添加到源中,这将对我有很大帮助。如果我有一个自定义的 GraphStage ,我可以拥有自己的 Shape ,其 outlets 集合在 Graph.create 阶段动态增长吗?正常的 DSL 操作 ~> 由这个调用支持:

b.addEdge(importAndGetPort(b), to)

构建器如何在这里“获取”Outlet,我是否能够按需增加我的形状?

如果这不起作用,是否可以“弹出”先前的广播,断开其边缘并在图形构建期间将它们与新的更大的广播连接起来?

最佳答案

GraphDSL 不允许动态改变你的形状。

但是,从 Akka 2.4.10 开始,您可以使用 BroadcastHub(和 MergeHub)。

BroadcastHub 可以为您提供一个具体化为 Source 的 Sink。
此 Source 又可以根据需要多次具体化,以动态附加多个订阅者。

因此,对于 DAG 的节点(例如,入度 = 1 和出度 = 3),你可以有类似的东西

val hubSource = inEdgeSource.toMat(BroadcastHub.sink(bufferSize = ...))(Keep.right).run()

val nodeSink1 = hubSource.to(outEdgeSink1).run()
val nodeSink2 = hubSource.to(outEdgeSink2).run()
val nodeSink3 = hubSource.to(outEdgeSink3).run()

阿卡文档:

http://doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Dynamic_fan-in_and_fan-out_with_MergeHub_and_BroadcastHub

关于scala - Akka 流 : dynamically grow outlets during graph builder phase,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37217739/

10-12 16:32
查看更多