本文介绍了akka-http 中的连接池使用源队列实现线程安全吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
参考下面提到的实现:
http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
提供来自多个线程的队列 http 请求是否线程安全?如果不是,那么实现此类要求的最佳方法是什么?也许使用专门的演员?
Is it thread safe to offer the queue http requests from multiple threads ?If it isn't, what is the best way to implement such requirement ? using a dedicated actor perhaps ?
推荐答案
正如@frederic-a 所述,SourceQueue
不是线程安全的解决方案.
As correctly stated by @frederic-a, SourceQueue
is not a thread safe solution.
也许合适的解决方案是使用 MergeHub
(参见 文档 了解更多详情).这有效地允许您分两个阶段运行图表.
Perhaps a fit solution would be to use a MergeHub
(see docs for more details). This effectively allows you to run your graph in two stages.
- 从你的集线器到你的接收器(这变成了接收器)
- 将在点 1 实现的接收器分发给您的用户.
Sink
实际上被设计为分布式的,所以这是非常安全的.
- from your hub to your sink (this materializes to a sink)
- distribute the sink materialized at point 1 to your users.
Sink
s are actually designed to be distributed, so this is perfectly safe.
根据 MergeHub
行为
如果消费者跟不上那么所有的生产者背压.
下面的代码示例:
val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
// on the user threads
val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ???
source.runWith(reqSink)
这篇关于akka-http 中的连接池使用源队列实现线程安全吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!