我有以下代码:

class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
  var queue = Queue.bounded[F, String](100)

  def createService(queue: Queue[F, String]): F[Unit] = ???

  val service: HttpRoutes[F] = HttpRoutes.of[F] {
    case PUT -> Root / "services" =>
      val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
      val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
        case Text(t, _) => F.delay(println(t))
        case f => F.delay(println(s"Unknown type: $f"))
      }

      // How to "spawn" createService?

      toClientF.flatMap { toClient =>
        WebSocketBuilder[F].build(toClient, fromClient)
      }
  }
}


createService是创建新服务的功能。创建新服务是一个非常复杂的过程,它涉及触发CI管道,等待它们完成,然后以相同的方式触发更多CI管道。它接收到的队列将用于向浏览器报告当前正在执行的操作。

我想同时“产生” createService,并使其运行直到完成。但是,同时我想立即将WebSocket返回给客户端。我也无法在“生成” createService时阻止。

我被卡住了。我只能考虑使用shift,但这意味着for comprehension中的下一行将阻止等待createService完成,然后再将websocket返回给客户端。

我的方法错了吗?我究竟做错了什么?

最佳答案

由于FConcurrentEffect的实例,因此您也有一个Concurrent实例。

因此,您可以使用Concurrent[F].start来将Fiber返回到正在运行的操作(不过,如果您不需要取消/确保完成操作,则可以忽略Fiber)。

  val service: HttpRoutes[F] = HttpRoutes.of[F] {
    case PUT -> Root / "services" =>
      val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
      val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
        case Text(t, _) => F.delay(println(t))
        case f => F.delay(println(s"Unknown type: $f"))
      }

      for {
        toClient <- toClientF
        _ <- Concurrent[F].start(createService)
        websocket <- WebSocketBuilder[F].build(toClient, fromClient)
      } yield websocket
  }

10-06 14:50