中的连接池使用源队列实现线程安全吗

中的连接池使用源队列实现线程安全吗

本文介绍了akka-http 中的连接池使用源队列实现线程安全吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

参考下面提到的实现:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

提供来自多个线程的队列 http 请求是否线程安全?如果不是,那么实现此类要求的最佳方法是什么?也许使用专门的演员?

Is it thread safe to offer the queue http requests from multiple threads ?If it isn't, what is the best way to implement such requirement ? using a dedicated actor perhaps ?

推荐答案

正如@frederic-a 所述,SourceQueue 不是线程安全的解决方案.

As correctly stated by @frederic-a, SourceQueue is not a thread safe solution.

也许合适的解决方案是使用 MergeHub(参见 文档 了解更多详情).这有效地允许您分两个阶段运行图表.

Perhaps a fit solution would be to use a MergeHub(see docs for more details). This effectively allows you to run your graph in two stages.

  1. 从你的集线器到你的接收器(这变成了接收器)
  2. 将在点 1 实现的接收器分发给您的用户.Sink 实际上被设计为分布式的,所以这是非常安全的.
  1. from your hub to your sink (this materializes to a sink)
  2. distribute the sink materialized at point 1 to your users. Sinks are actually designed to be distributed, so this is perfectly safe.

根据 MergeHub 行为

如果消费者跟不上那么所有的生产者背压.

下面的代码示例:

val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
  MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16)
  .via(poolClientFlow)
  .toMat(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p))    => p.failure(e)
  }))(Keep.left)
  .run()

// on the user threads

val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ???
source.runWith(reqSink)

这篇关于akka-http 中的连接池使用源队列实现线程安全吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 10:25