documentation to implement a KillSwitch 之后,我能够编写此简单示例来停止Source发出无限数。

object KillSwitchSample extends App {
  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
  val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  val killSwitch = KillSwitches.shared("switch")

  RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val flow = builder.add(Flow[Int].map(_ * 2))
    mySource.via(killSwitch.flow) ~> flow ~> Sink.foreach(println)
    ClosedShape
  }).run()

  Thread.sleep(200)

  killSwitch.shutdown()
}

class NumbersSource extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private var counter = 1

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, counter)
          counter += 1
        }
      })
    }
}

我的用例与从Source使用OpenCV从视频文件发出帧的意义上有所不同。为什么上游没有取消?我在这里想念什么?
object KillSwitchMinimalMain extends App {
  val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java"))
  System.load(libopencv_java(0))

  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val videoFile = Video("Video.MOV")

  val sourceGraph: Graph[SourceShape[Frame], NotUsed] = new VideoSource(videoFile)
  val videoSource: Source[Frame, NotUsed] = Source.fromGraph(sourceGraph)

  val killSwitch = KillSwitches.shared("switch")

  RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val matConversion: FlowShape[Frame, Image] = builder.add(Flow[Frame].map { el => MediaConversion.convertMatToImage(el.frame) })

    videoSource.via(killSwitch.flow) ~> matConversion ~> Sink.foreach(println)

    ClosedShape
  }).run()

  Thread.sleep(200)

  killSwitch.shutdown()
}

class VideoSource(videoFile: Video) extends GraphStage[SourceShape[Frame]] {
  val out: Outlet[Frame] = Outlet("VideoSource")
  override val shape: SourceShape[Frame] = SourceShape(out)
  val log: Logger = LoggerFactory.getLogger(getClass)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private val capture = new VideoCapture()
      private val frame = new Mat()
      private var videoPos: Double = _

      override def preStart(): Unit = {
        capture.open(videoFile.filepath)
        readFrame()
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, Frame(videoPos, frame))
          readFrame()
        }
      })

      private def readFrame(): Unit = {
        if (capture.isOpened) {
          videoPos = capture.get(1)
          log.info(s"reading frame $videoPos")
          capture.read(frame)
        }
      }
    }
}

@svezfaz要求的控制台输出:
13:17:00.046 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 0.0
13:17:00.160 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 1.0
javafx.scene.image.WritableImage@64b06f30
13:17:00.698 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 2.0
javafx.scene.image.WritableImage@1e011979
13:17:00.826 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 3.0
javafx.scene.image.WritableImage@52c9a35c
13:17:00.969 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 4.0
javafx.scene.image.WritableImage@13968f9e
13:17:01.137 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 5.0
javafx.scene.image.WritableImage@6ab783be
// and so on ..

最佳答案

问题是您在自定义阶段引入了阻塞。我不知道OpenCV API,但是我猜想它会在您调用capture.read(frame)时发生。
现在,除非另有指示,否则您的图形将在单个Actor中运行,因此在您的舞台中进行阻止将对整个actor进行阻止。

在您的源代码后强制async边界应该可以解决问题。

还要注意,这里不需要GraphDSL,可以使用via / to DSL紧凑地运行所有内容。

下面的解决方案尝试

object KillSwitchMinimalMain extends App {
  val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java"))

  System.load(libopencv_java(0))
  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val videoFile = Video("Video.MOV")

  val killSwitch = KillSwitches.shared("switch")
  val matConversion = Flow[ByteString].map { _.utf8String }

  Source.fromGraph(new VideoSource())
    .async
    .via(killSwitch.flow)
    .via(matConversion)
    .runForeach(println)

  Thread.sleep(200)

  killSwitch.shutdown()
}

有关底层Akka流的并发模型的更多信息,您可以阅读blogpost

关于scala - Akka流:用于自定义SourceShape的KillSwitch,可从视频文件中发射帧,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41182910/

10-09 15:49