问题描述
我正在使用。
私有val(queueSource,connectionPool)= Source.queue [(HttpRequest,Promise [HttpResponse])] [queueSize,OverflowStrategy。 backpressure).async
.viaMat(poolFlow)(Keep.both)
.toMat(
Sink.foreach({
case((Success(resp),p))= >
p.success(resp)
case((Failure(e),p))=> p.failure(e)
})
)(保持左)
.run()
我有很多要求争夺连接池,但出现以下错误:
java.lang.IllegalStateException:您必须等待以前的报价被解析为在akka.stream.impl.QueueSource $$ anon $ 1.akka $ stream $ impl $ QueueSource $$ anon $$ bufferElem(QueueSource.scala:84)
发送另一个请求akka.stream.impl .QueueSource $$ anon $ 1 $$ anonfun $ 1.apply(QueueSource.scala:94)
在akka.stream.impl.QueueSource $$ anon $ 1 $$ anonfun $ 1.apply(QueueSource.scala:91 )
在akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
在akka.stream.impl.fusing.GraphInterpreterShell $ AsyncInput.execute(ActorGraphInterpreter.scala:464)
在akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559)
在akka.stream.impl.fusing.ActorGraphInterpreter.akka $ stream $ impl $ fusing $ ActorGraphInterpreter $$ processEvent( ActorGraphInterpreter.scala:741)
在akka.stream.impl.fusing.ActorGraphInterpreter $$ anonfun $ receive $ 1.applyOrElse(ActorGraphInterpreter.scala:756)
在akka.actor.Actor $ class.aroundReceive( Actor.scala:517)
在akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666)
在akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
在akka.actor.ActorCell.invoke(ActorCell.scala:496)
在akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
在akka.dispatch.Mailbox.run(Mailbox .s cala:224)
在akka.dispatch.Mailbox.exec(Mailbox.scala:234)
在akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
在akka .dispatch.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
在akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在akka.dispatch.forkjoin.ForkJoinWorkerThread .run(ForkJoinWorkerThread.java:107)
我尝试添加.async,但反压力仍然没有解决上面的错误是什么意思,以及如何进行问题研究?
您已经在构建 Source
和 Source.queue
对象方法,因此我认为不可能直接对任何功能调用施加反压力 queue.offer
。但是,您的问题可能会以其他方式解决。
不同的 OverflowStrategy
您可以将策略更改为 OverflowStrategy.dropHead
或 OverflowStrategy.dropTail
。如果与 queue.offer
调用的速率相比,您的 queueSize
足够大,则可能会满足您的需求。 / p>
I'm using host-level API with a queue.
private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async
.viaMat(poolFlow)(Keep.both)
.toMat(
Sink.foreach({
case ((Success(resp), p)) =>
p.success(resp)
case ((Failure(e), p)) => p.failure(e)
})
)(Keep.left)
.run()
I have a lot of request racing for connections in the connection pool but I get the following error:
java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request
at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84)
at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94)
at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I tried adding .async but back pressure still does not kick in. What does the error above mean and how to go about investigating the problem?
You are already constructing a Source
with the Source.queue
object method so I don't think it is possible to directly apply back pressure to whatever functionality is calling queue.offer
. However, your problem can likely be solved in a different way.
Different OverflowStrategy
You could change the strategy to something like OverflowStrategy.dropHead
or OverflowStrategy.dropTail
. If your queueSize
is large enough compared to the rate of queue.offer
invocations then this will probably suite your needs.
这篇关于如何启用Source.Queue背压的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!