我一直在使用Observable.fromEmitter()
作为Observable.create()
的绝佳替代品。我最近遇到了一些奇怪的行为,我无法完全弄清为什么会这样。我真的很感谢一个对背压有一定了解的人,并且调度程序也对此进行了介绍。
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
该应用程序的输出是
Received 1
Received 2
...
Received 128
然后它仍然停留在128(可能是因为这是RxJava的默认缓冲区大小)。
如果我将
fromEmitter()
中指定的模式更改为BackpressureMode.NONE
,则该代码将按预期工作。如果我删除对observeOn()
的调用,它也可以按预期工作。有谁能解释为什么会这样? 最佳答案
这是同一个池的死锁情况。 subscribeOn
在正在使用的同一线程上调度下游request
,但是如果该线程正忙于睡眠/发射循环,则该请求将永远不会传递到fromEmitter
,因此,一段时间后LATEST
开始丢弃元素,直到最后如果主源等待足够长的时间,则将传递最后一个值(999)。 (与我们删除的onBackpressureBlock
的情况类似。)
如果subscribeOn
不对请求进行此调度,则该示例将正常工作。
我已经打开an issue来解决问题。
现在的解决方法是将更大的缓冲区大小与observeOn
一起使用(有重载)或使用fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()