我有以下形式的可运行图。
def getGraph[T](sequence: Seq[T], tickConsumers: Map[T, ActorRef]) =
RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder =>
val ticker = builder.add(new SomeTickProducer))
val broadcast = builder.add(Broadcast[T](sequence.length))
ticker ~> broadcast
sequence.foreach { item =>
broadcast ~>
builder.add(new SomeTickProcesser(item)) ~>
Sink.actorRef(tickConsumers(item), NotUsed)
}
ClosedShape
}
)
所以我的图表是封闭的,不产生任何数据,我无法实现。实际上它只是从外部服务接收一些数据,处理它们并广播给几个消费者。
就
SomeTickProducer
为外部服务创建相当大的负载而言,我一次不应运行更多这样的图。有什么方法可以将我的图形转换为类似 Future
的内容并等待它使用 Await
结束?或者也许有更好的方法来组织一些队列? 最佳答案
为了让图形实现 future ,您需要为 GraphDSL.create
方法提供一个这样做的阶段,例如 Sink.ignore
。然后,您将从 GraphDSL
构建器内部访问该阶段。
假设 SomeTickProducer
是一个最终会完成的 Source
,您可以观察 SomeTickProducer
的终止并将其转发到图的导出。
然后,当您具体化图形时,它将是 Future[Done]
类型。
这将起作用:
def getGraph[T](sequence: Seq[T], tickConsumers: Map[T, ActorRef]) =
RunnableGraph.fromGraph(
GraphDSL.create(Sink.ignore) { implicit builder => out =>
val producer = new SomeTickProducer
val ticker = builder.add(producer)
val broadcast = builder.add(Broadcast[T](sequence.length))
ticker ~> broadcast
sequence.foreach { item =>
broadcast ~>
builder.add(new SomeTickProcesser(item)) ~>
Sink.actorRef(tickConsumers(item), NotUsed)
}
producer.watchTermination()(Keep.none) ~> out
ClosedShape
}
)
关于scala - 等待 ClosedShape 流完成,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47599617/