我正在编写一个处理位图的示例应用程序。该过程可以由滑块控制,因此当滑块位置更改时,我将生成另一个位图。
当用户拖动滑块时,它每秒发出约10-20个事件。处理位图大约需要1秒钟,因此处理队列很快就被请求卡住了。
在我看来,这是一个很好的反压示例,但是我不知道如何使用Flowable
和BackpressureStrategy
这样的东西来正确处理它。而且,我无法使这个小样本工作:
val pubsub = PublishSubject.create<Int>()
pubsub
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(computation())
.subscribe {
Timber.d("consume %d - %s", it, Thread.currentThread().name)
Thread.sleep(3000)
}
for (i in 0 .. 1000) {
Timber.d("emit %d - %s", i, Thread.currentThread().name)
pubsub.onNext(i)
}
好吧,我希望这段代码可以通过
PublishSubject
发出1000个整数,但是只要处理每个过程花费3秒,就应该丢弃999个整数,只应处理“0”和“1000” ...但是在日志中,我看到我的所有整数都是一个一个地缓慢处理的,背压策略被忽略了。实际上,
toFlowable(...)
表达式似乎什么也没做。无论有没有背压,我都会看到1000的排放量,再加上几分钟的消耗量。我在这里想念什么?如何删除中间元素并仅消耗最新的可用元素?
解决了:
observeOn(computation())
实际上是observeOn(computation(), delayErrors = false, bufferSize = 128)
。要查看实际的背压,请在调用observeOn(...)
时减小bufferSize 最佳答案
这可能与observeOn(computation())
有关。根据支持线程的不同,可能会自动限制它。物品的发射排队。因此,Flowable
上没有任何背压。
尝试将这些线程更改放在toFlowable(LATEST)
之前,或使用其他的Scheduler
(不太宽容),或将更多项目放入pubsub
中。
您也可以使用 observeOn(Scheduler scheduler, boolean, int)
来强制bufferSize。