这是我的代码

Future.sequence((2 to firstPage.pages).map
{ count =>
    getCommentPage(av, count)
}).map(//do something)


getCommentPage中,一个Http().singleRequest用于获取数据,如下所示:

val responseFuture: Future[HttpResponse] =
    Http(system).singleRequest(HttpRequest(GET, uri = requestUri))
responseFuture
    .map(_.entity)
    .flatMap(_.toStrict(10 seconds)(materializer))
    .map(_.data)
    .map(_.utf8String)
    .map((jsonString: String) => {
        //do something to extract data
    }


较小的firstPage.pages会很好地工作,但是当firstPage.pages较大(大约50或更大)时,会有一个例外:

akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [32]. This means that the request queue of this pool (HostConnectionPoolSetup(api.bilibili.cn,80,ConnectionPoolSetup(ConnectionPoolSettings(4,0,5,32,1,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.0.9),10 seconds,1 minute,512,None,<function0>,List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Full,Error,Map(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0, User-Agent -> 32),false,akka.stream.impl.ConstantFun$$$Lambda$244/19208387@4780bf,akka.stream.impl.ConstantFun$$$Lambda$244/19208387@4780bf,akka.stream.impl.ConstantFun$$$Lambda$245/6903324@1d25a2e),None),TCPTransport),akka.http.scaladsl.HttpConnectionContext$@796a3e,akka.event.MarkerLoggingAdapter@1cc552a))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.


怎么解决呢?

最佳答案

您正在寻找的东西是专用的host connection pool
简化后的结果代码如下:

val poolClientFlow =
    Http().cachedHostConnectionPool[HttpRequest](host, port)

def performRequest(request: HttpRequest): Future[HttpResponse] =
    Source
      .single(request)
      .via(poolClientFlow)
      .mapAsync(1) {
        case (response, _) =>
          Future.fromTry(response)
      }
      .runWith(Sink.head)


确保您会打电话

response.discardEntityBytes()


或只是取消编组以防止资源泄漏。

他们在文档中警告说这是一种反模式,但是如果您有足够的内存并且不需要任何队列管理,它实际上可以很好地运行。

09-05 08:37