我在下面尝试过。

public static void main(String[] args) throws Exception {
  Observable.interval(100L, TimeUnit.MILLISECONDS)
    .onBackpressureDrop().subscribe(new Subscriber<Long>() {

    @Override
    public void onStart() {
      request(1L);
    }

    @Override
    public void onNext(Long t) {
      System.out.println("received: " + t);
      try {
        Thread.sleep(1000L);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      request(1);
    }

    @Override
    public void onCompleted() {
      System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
      e.printStackTrace();
    }
  });

  TimeUnit.SECONDS.sleep(10L);
}


我希望订阅者会收到0, 10, 20, 30...项目,因为onBackpressureDrop方法会在请求之前删除项目。
但是结果是0, 1, 2, 3...

另外,我尝试了onBackpressureLatest(),但结果却是相同的0, 1, 2, 3...

在我看来,这与onBackpressureBuffer相同。

我会误解这种方法吗?
如果是这样,我如何尝试onBackpressureDrop方法可以用作Javadoc中的大理石图?

最佳答案

onBackpressureDrop在这里不做任何事情,因为在这种情况下,没有“反压”。在subscribe方法中执行的操作是在每次发射后休眠1秒钟,以阻塞Observable.interval方法在其上创建新值的线程。因此,Observable.interval不会每100毫秒生成一个值,而是每秒只能生成一次!

我建议您在observeOnonBackpressureDrop之间放置一个subscribe,以使值的生成和在subscribe中执行的操作位于不同的线程上。要查看效果,请使用较小的缓冲区大小(例如1)的this overload。据我所知,任何类型的调度程序都可以执行。

10-06 09:19