我需要处理通量中的反压力,该通量接收对象列表作为输入。列表的大小从几百个到几十万个元素不等。
实际的代码是:
Flux.fromIterable(alarms)
.limitRate(parallelism)
.parallel(parallelism)
.runOn(Schedulers.elastic(), bufferSize)
.doOnNext(reactiveHandleDataService::handleAlarm)
;
参数“ limitRange”只是强制拒绝超过一定大小的列表,这是我不想要的。我需要将接收到的所有数据都提供给reactHandleDataService,我不会丢失任何消息。
在这种情况下如何处理背压?我没有找到很多很好地说明问题的示例,尤其是使用可迭代的源。
我正在使用Californium-SR3作为反应堆的发布版,这是Spring Boot应用程序的一部分。
最佳答案
如果处理后无法支持许多数据以发送对新数据的请求,则是handleAlarm作业具有背压1.如果您无法使用backPressure处理alarm,则可以添加延迟
Flux.fromIterable(alarms)
.limitRate(parallelism)
.delayElements(Duration.ofMillis(10))
.doOnNext(reactiveHandleDataService::handleAlarm)
.subscribeOn(Schedulars.elastic)
;
如果您想要背压,为什么要并行运行
关于java - 助焊剂起死回生如何应对背压,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53884164/