我正在玩RxJava(准确地说是RxKotlin)。在这里,我有以下Observable
:
fun metronome(ms: Int) = observable<Int> {
var i = 0;
while (true) {
if (ms > 0) {
Thread.sleep(ms.toLong())
}
if (it.isUnsubscribed()) {
break
}
it.onNext(++i)
}
}
我想将其中的一些合并并同时运行。他们忽略背压,因此必须将背压运算符应用于它们。
然后我创建
val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)
然后合并
metronome
:val o = Observable.merge(listOf(metronome(0),
metronome(1000).map { "---------" })
.map { it.onBackpressureBlock().subscribeOn(scheduler) })
.take(5000, TimeUnit.MILLISECONDS)
第一个应该连续发射物品。
如果在运行的最后3秒钟这样做,则会得到以下输出:
...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
似乎
Observable
在同一线程上订阅,并且第一个可观察对象被阻止3+秒。但是,当我交换
onBackpressureBlock()
和subscribeOn(scheduler)
调用时,输出将达到我的期望,输出将在整个执行过程中合并。对我来说,在RxJava中调用订单很重要,但是我不太了解在这种特殊情况下会发生什么。
那么,当在
onBackpressureBlock
之前应用subscribeOn
运算符时会发生什么,如果在之后应用? 最佳答案
onBackpressureBlock
运算符是失败的实验;它需要注意在哪里申请。例如,subscribeOn().onBackpressureBlock()
起作用,但反之则不行。
RxJava具有称为interval
的非阻塞定期计时器,因此您无需自己滚动。