本文介绍了您如何在最新的Akka(2.4.6)中限制Flow?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您如何在最新的Akka(2.4.6)中限制Flow?我想限制Http客户端流量,以将请求数量限制为每秒3个请求。我在网上找到了以下示例,但这是针对旧Akka和Akka-streams API的更改,以至于我不知道如何重写它。

How do you throttle Flow in the latest Akka (2.4.6) ? I'd like to throttle Http client flow to limit number of requests to 3 requests per second. I found following example online but it's for old Akka and akka-streams API changed so much that I can't figure out how to rewrite it.

def throttled[T](rate: FiniteDuration): Flow[T, T] = {
  val tickSource: Source[Unit] = TickSource(rate, rate, () => ())
  val zip = Zip[T, Unit]
  val in = UndefinedSource[T]
  val out = UndefinedSink[T]
  PartialFlowGraph { implicit builder =>
    import FlowGraphImplicits._
    in ~> zip.left ~> Flow[(T, Unit)].map { case (t, _) => t } ~> out
    tickSource ~> zip.right
  }.toFlow(in, out)
}

此处是迄今为止我最好的尝试

Here is my best attempt so far

def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val ticker = Source.tick(rate, rate, Unit)

  val zip = builder.add(Zip[T, Unit.type])
  val map = Flow[(T, Unit.type)].map { case (value, _) => value }
  val messageExtractor = builder.add(map)

  val in = Inlet[T]("Req.in")
  val out = Outlet[T]("Req.out")

  out ~> zip.in0
  ticker ~> zip.in1
  zip.out ~> messageExtractor.in

  FlowShape.of(in, messageExtractor.out)
})

虽然它会在我的主要流程中引发异常:)

it throws exception in my main flow though :)

private val queueHttp = Source.queue[(HttpRequest, (Any, Promise[(Try[HttpResponse], Any)]))](1000, OverflowStrategy.backpressure)
  .via(throttleFlow(rate))
  .via(poolClientFlow)
  .mapAsync(4) {
    case (util.Success(resp), any) =>
      val strictFut = resp.entity.toStrict(5 seconds)
      strictFut.map(ent => (util.Success(resp.copy(entity = ent)), any))
    case other =>
      Future.successful(other)
  }
  .toMat(Sink.foreach({
    case (triedResp, (value: Any, p: Promise[(Try[HttpResponse], Any)])) =>
      p.success(triedResp -> value)
    case _ =>
      throw new RuntimeException()
  }))(Keep.left)
  .run

其中 poolClientFlow Http()(system).cachedHostConnectionPool [Any](baseDomain)

例外是:

Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph.
    at scala.Predef$.require(Predef.scala:219)
    at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)


推荐答案

这里尝试使用@Qingwei提到的节流方法。关键是不要使用 bindAndHandle(),而要使用 bind()并限制输入连接的流量处理它们。该代码来自的实现,但是为简化起见,省略了一些错误处理。

Here is an attempt that uses the throttle method as mentioned by @Qingwei. The key is to not use bindAndHandle(), but to use bind() and throttle the flow of incoming connections before handling them. The code is taken from the implementation of bindAndHandle(), but leaves out some error handling for simplicity. Please don't do that in production.

implicit val system = ActorSystem("test")
implicit val mat = ActorMaterializer()
import system.dispatcher
val maxConcurrentConnections = 4

val handler: Flow[HttpRequest, HttpResponse, NotUsed] = complete(LocalDateTime.now().toString)

def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] =
  incomingConnection.flow
        .watchTermination()(Keep.right)
        .joinMat(handler)(Keep.left)
        .run()

Http().bind("127.0.0.1", 8080)
  .throttle(3, 1.second, 1, ThrottleMode.Shaping)
  .mapAsyncUnordered(maxConcurrentConnections)(handleOneConnection)
  .to(Sink.ignore)
  .run()

这篇关于您如何在最新的Akka(2.4.6)中限制Flow?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 22:37