我有以下形式的可运行图。

def getGraph[T](sequence: Seq[T], tickConsumers: Map[T, ActorRef]) =
  RunnableGraph.fromGraph(
    GraphDSL.create() { implicit builder =>
      val ticker = builder.add(new SomeTickProducer))
      val broadcast = builder.add(Broadcast[T](sequence.length))
      ticker ~> broadcast
      sequence.foreach { item =>
        broadcast ~>
        builder.add(new SomeTickProcesser(item)) ~>
        Sink.actorRef(tickConsumers(item), NotUsed)
      }
      ClosedShape
    }
  )

所以我的图表是封闭的,不产生任何数据,我无法实现。实际上它只是从外部服务接收一些数据,处理它们并广播给几个消费者。

SomeTickProducer 为外部服务创建相当大的负载而言,我一次不应运行更多这样的图。有什么方法可以将我的图形转换为类似 Future 的内容并等待它使用 Await 结束?或者也许有更好的方法来组织一些队列?

最佳答案

为了让图形实现 future ,您需要为 GraphDSL.create 方法提供一个这样做的阶段,例如 Sink.ignore 。然后,您将从 GraphDSL 构建器内部访问该阶段。

假设 SomeTickProducer 是一个最终会完成的 Source,您可以观察 SomeTickProducer 的终止并将其转发到图的导出。

然后,当您具体化图形时,它将是 Future[Done] 类型。

这将起作用:

def getGraph[T](sequence: Seq[T], tickConsumers: Map[T, ActorRef]) =
  RunnableGraph.fromGraph(
    GraphDSL.create(Sink.ignore) { implicit builder => out =>
      val producer = new SomeTickProducer
      val ticker = builder.add(producer)
      val broadcast = builder.add(Broadcast[T](sequence.length))
      ticker ~> broadcast
      sequence.foreach { item =>
        broadcast ~>
        builder.add(new SomeTickProcesser(item)) ~>
        Sink.actorRef(tickConsumers(item), NotUsed)
      }
      producer.watchTermination()(Keep.none) ~> out
      ClosedShape
    }
  )

关于scala - 等待 ClosedShape 流完成,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47599617/

10-10 13:53