问题描述
您如何在最新的Akka(2.4.6)中限制Flow?我想限制Http客户端流量,以将请求数量限制为每秒3个请求。我在网上找到了以下示例,但这是针对旧Akka和Akka-streams API的更改,以至于我不知道如何重写它。
How do you throttle Flow in the latest Akka (2.4.6) ? I'd like to throttle Http client flow to limit number of requests to 3 requests per second. I found following example online but it's for old Akka and akka-streams API changed so much that I can't figure out how to rewrite it.
def throttled[T](rate: FiniteDuration): Flow[T, T] = {
val tickSource: Source[Unit] = TickSource(rate, rate, () => ())
val zip = Zip[T, Unit]
val in = UndefinedSource[T]
val out = UndefinedSink[T]
PartialFlowGraph { implicit builder =>
import FlowGraphImplicits._
in ~> zip.left ~> Flow[(T, Unit)].map { case (t, _) => t } ~> out
tickSource ~> zip.right
}.toFlow(in, out)
}
此处是迄今为止我最好的尝试
Here is my best attempt so far
def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val ticker = Source.tick(rate, rate, Unit)
val zip = builder.add(Zip[T, Unit.type])
val map = Flow[(T, Unit.type)].map { case (value, _) => value }
val messageExtractor = builder.add(map)
val in = Inlet[T]("Req.in")
val out = Outlet[T]("Req.out")
out ~> zip.in0
ticker ~> zip.in1
zip.out ~> messageExtractor.in
FlowShape.of(in, messageExtractor.out)
})
虽然它会在我的主要流程中引发异常:)
it throws exception in my main flow though :)
private val queueHttp = Source.queue[(HttpRequest, (Any, Promise[(Try[HttpResponse], Any)]))](1000, OverflowStrategy.backpressure)
.via(throttleFlow(rate))
.via(poolClientFlow)
.mapAsync(4) {
case (util.Success(resp), any) =>
val strictFut = resp.entity.toStrict(5 seconds)
strictFut.map(ent => (util.Success(resp.copy(entity = ent)), any))
case other =>
Future.successful(other)
}
.toMat(Sink.foreach({
case (triedResp, (value: Any, p: Promise[(Try[HttpResponse], Any)])) =>
p.success(triedResp -> value)
case _ =>
throw new RuntimeException()
}))(Keep.left)
.run
其中 poolClientFlow
是 Http()(system).cachedHostConnectionPool [Any](baseDomain)
例外是:
Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph.
at scala.Predef$.require(Predef.scala:219)
at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)
推荐答案
这里尝试使用@Qingwei提到的节流方法。关键是不要使用 bindAndHandle()
,而要使用 bind()
并限制输入连接的流量处理它们。该代码来自的实现,但是为简化起见,省略了一些错误处理。
Here is an attempt that uses the throttle method as mentioned by @Qingwei. The key is to not use bindAndHandle()
, but to use bind()
and throttle the flow of incoming connections before handling them. The code is taken from the implementation of bindAndHandle()
, but leaves out some error handling for simplicity. Please don't do that in production.
implicit val system = ActorSystem("test")
implicit val mat = ActorMaterializer()
import system.dispatcher
val maxConcurrentConnections = 4
val handler: Flow[HttpRequest, HttpResponse, NotUsed] = complete(LocalDateTime.now().toString)
def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] =
incomingConnection.flow
.watchTermination()(Keep.right)
.joinMat(handler)(Keep.left)
.run()
Http().bind("127.0.0.1", 8080)
.throttle(3, 1.second, 1, ThrottleMode.Shaping)
.mapAsyncUnordered(maxConcurrentConnections)(handleOneConnection)
.to(Sink.ignore)
.run()
这篇关于您如何在最新的Akka(2.4.6)中限制Flow?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!