我在下面尝试过。
public static void main(String[] args) throws Exception {
Observable.interval(100L, TimeUnit.MILLISECONDS)
.onBackpressureDrop().subscribe(new Subscriber<Long>() {
@Override
public void onStart() {
request(1L);
}
@Override
public void onNext(Long t) {
System.out.println("received: " + t);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
});
TimeUnit.SECONDS.sleep(10L);
}
我希望订阅者会收到
0, 10, 20, 30...
项目,因为onBackpressureDrop方法会在请求之前删除项目。但是结果是
0, 1, 2, 3...
。另外,我尝试了
onBackpressureLatest()
,但结果却是相同的0, 1, 2, 3...
。在我看来,这与
onBackpressureBuffer
相同。我会误解这种方法吗?
如果是这样,我如何尝试onBackpressureDrop方法可以用作Javadoc中的大理石图?
最佳答案
onBackpressureDrop
在这里不做任何事情,因为在这种情况下,没有“反压”。在subscribe
方法中执行的操作是在每次发射后休眠1秒钟,以阻塞Observable.interval
方法在其上创建新值的线程。因此,Observable.interval
不会每100毫秒生成一个值,而是每秒只能生成一次!
我建议您在observeOn
和onBackpressureDrop
之间放置一个subscribe
,以使值的生成和在subscribe
中执行的操作位于不同的线程上。要查看效果,请使用较小的缓冲区大小(例如1)的this overload。据我所知,任何类型的调度程序都可以执行。