本文介绍了Akka HTTP连接池在几个小时后挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 29岁程序员,3月因学历无情被辞! 我有一个运行了几个小时后挂起的HTTP连接池:I have an HTTP Connection Pool that hangs after a couple of hours of running:private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = { val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host) Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew) .via(pool).toMat(Sink.foreach { case ((Success(res), p)) => p.success(res) case ((Failure(e), p)) => p.failure(e) })(Keep.left).run }我将以下项排队:private def enqueue(uri: Uri): Future[HttpResponse] = { val promise = Promise[HttpResponse] val request = HttpRequest(uri = uri) -> promise queue.offer(request).flatMap { case Enqueued => promise.future case _ => Future.failed(ConnectionPoolDroppedRequest) }}并像这样解决响应private def request(uri: Uri): Future[HttpResponse] = { def retry = { Thread.sleep(config.dispatcherRetryInterval) logger.info(s"retrying") request(uri) } logger.info("req-start") for { response <- enqueue(uri) _ = logger.info("req-end") finalResponse <- response.status match { case TooManyRequests => retry case OK => Future.successful(response) case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString)) } } yield finalResponse}然后,如果Future成功,则始终转换此函数的结果:The result of this function is then always transformed if the Future is successful:def get(uri: Uri): Future[Try[JValue]] = { for { response <- request(uri) json <- Unmarshal(response.entity).to[Try[JValue]] } yield json}一会儿一切正常,然后我在日志中看到的全部都是req-start,没有req-end。Everything works fine for a while and then all I see in the logs are req-start and no req-end.我的akka​​配置是这样的:My akka configuration is like this:akka { actor.deployment.default { dispatcher = "my-dispatcher" }}my-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 256 parallelism-factor = 128.0 parallelism-max = 1024 }}akka.http { host-connection-pool { max-connections = 512 max-retries = 5 max-open-requests = 16384 pipelining-limit = 1 }}我不确定这是配置问题还是代码问题。我的并行性和连接数如此之高,因为没有它,我的请求/速率就会很差(我想尽可能快地请求-我还有其他速率限制代码来保护服务器)。I'm not sure if this is a configuration problem or a code problem. I have my parallelism and connection numbers so high because without it I get very poor req/s rate (I want to request as fast possible - I have other rate limiting code to protect the server).推荐答案您没有使用从服务器获取的响应的实体。引用以下文档:You are not consuming the entity of the responses you get back from the server. Citing the docs below:该实体以 Source [ByteString,_] ,该源需要运行以避免资源匮乏。The entity comes in the form of a Source[ByteString, _] which needs to be run to avoid resource starvation.如果您不需要读取实体,消耗实体字节的最简单方法是通过使用If you don't need to read the entity, the simplest way to consume the entity bytes is to discard them, by usingres.discardEntityBytes()(您可以添加-例如- .future()。map来附加回调(...))。(you can attach a callback by adding - e.g. - .future().map(...)). 文档的此页面介绍了所有替代方法,包括如何读取字节(如果有)This page in the docs describes all the alternatives to this, including how to read the bytes if needed. ---编辑在提供了更多代码/信息之后,很显然该资源消费不是问题。此实现中还有另一个大的危险信号,即retry方法中的 Thread.sleep 。 这是一个阻塞调用,很可能会使您的基本actor系统的线程基础设施饿死。After more code/info was provided, it is clear that the resource consumption is not the problem. There is another big red flag in this implementation, namely the Thread.sleep in the retry method.This is a blocking call that is very likely to starve the threading infrastructure of your underlying actor system.提供了为何如此危险的完整说明。在文档。A full blown explanation of why this is dangerous was provided in the docs.尝试更改并使用 akka.pattern.after ( docs )。下面的示例:Try changing that and using akka.pattern.after (docs). Example below:def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri)) 这篇关于Akka HTTP连接池在几个小时后挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
07-31 10:42