我正在玩RxJava(准确地说是RxKotlin)。在这里,我有以下Observable

fun metronome(ms: Int) = observable<Int> {
    var i = 0;
    while (true) {
        if (ms > 0) {
            Thread.sleep(ms.toLong())
        }
        if (it.isUnsubscribed()) {
            break
        }
        it.onNext(++i)
    }
}


我想将其中的一些合并并同时运行。他们忽略背压,因此必须将背压运算符应用于它们。

然后我创建

val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)


然后合并metronome

val o = Observable.merge(listOf(metronome(0),
                                metronome(1000).map { "---------" })
                         .map { it.onBackpressureBlock().subscribeOn(scheduler) })
                  .take(5000, TimeUnit.MILLISECONDS)


第一个应该连续发射物品。
如果在运行的最后3秒钟这样做,则会得到以下输出:

...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------


似乎Observable在同一线程上订阅,并且第一个可观察对象被阻止3+秒。

但是,当我交换onBackpressureBlock()subscribeOn(scheduler)调用时,输出将达到我的期望,输出将在整个执行过程中合并。

对我来说,在RxJava中调用订单很重要,但是我不太了解在这种特殊情况下会发生什么。

那么,当在onBackpressureBlock之前应用subscribeOn运算符时会发生什么,如果在之后应用?

最佳答案

onBackpressureBlock运算符是失败的实验;它需要注意在哪里申请。例如,subscribeOn().onBackpressureBlock()起作用,但反之则不行。

RxJava具有称为interval的非阻塞定期计时器,因此您无需自己滚动。

09-29 23:45