我正在编写一个处理位图的示例应用程序。该过程可以由滑块控制,因此当滑块位置更改时,我将生成另一个位图。

当用户拖动滑块时,它每秒发出约10-20个事件。处理位图大约需要1秒钟,因此处理队列很快就被请求卡住了。

在我看来,这是一个很好的反压示例,但是我不知道如何使用FlowableBackpressureStrategy这样的东西来正确处理它。而且,我无法使这个小样本工作:

val pubsub = PublishSubject.create<Int>()

pubsub
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(computation())
.subscribe {
      Timber.d("consume %d - %s", it, Thread.currentThread().name)
      Thread.sleep(3000)
}

for (i in 0 .. 1000) {
      Timber.d("emit %d - %s", i, Thread.currentThread().name)
      pubsub.onNext(i)
}

好吧,我希望这段代码可以通过PublishSubject发出1000个整数,但是只要处理每个过程花费3秒,就应该丢弃999个整数,只应处理“0”和“1000” ...

但是在日志中,我看到我的所有整数都是一个一个地缓慢处理的,背压策略被忽略了。实际上,toFlowable(...)表达式似乎什么也没做。无论有没有背压,我都会看到1000的排放量,再加上几分钟的消耗量。

我在这里想念什么?如何删除中间元素并仅消耗最新的可用元素?

解决了:
observeOn(computation())实际上是observeOn(computation(), delayErrors = false, bufferSize = 128)。要查看实际的背压,请在调用observeOn(...)时减小bufferSize

最佳答案

这可能与observeOn(computation())有关。根据支持线程的不同,可能会自动限制它。物品的发射排队。因此,Flowable上没有任何背压。

尝试将这些线程更改放在toFlowable(LATEST)之前,或使用其他的Scheduler(不太宽容),或将更多项目放入pubsub中。

您也可以使用 observeOn(Scheduler scheduler, boolean, int) 来强制bufferSize。

08-18 18:13