我一直在寻找新的rx java 2,但我不确定我是否已经了解backpressure的想法了...

我知道我们有不支持Observablebackpressure和具有它的Flowable

因此,基于示例,可以说我有flowableinterval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

在大约128个值之后,这将崩溃,这很明显我消耗的速度比获取项目要慢。

但是然后我们与Observable相同
     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

即使我延迟使用它,它仍然完全不会崩溃。为了使Flowable工作,可以说我放了onBackpressureDrop运算符,当机了,但并非所有值都发出了。

因此,我目前在脑海中找不到答案的基本问题是,当我可以使用普通backpressure仍接收所有值而不管理Observable时,为什么还要关心buffer呢?或者,也许从另一方面来看,backpressure给我带来哪些优势来帮助他们管理和处理消费?

最佳答案

实际上,反压显示的是有界缓冲区,Flowable.observeOn具有128个元素的缓冲区,该缓冲区的耗尽速度与dowstream可以吸收的速度一样快。您可以单独增加此缓冲区的大小以处理突发信号源,并且所有反压管理实践仍从1.x开始适用。 Observable.observeOn具有无限制的缓冲区,该缓冲区不断收集元素,您的应用程序可能会用尽内存。

您可以使用Observable例如:

  • 处理GUI事件
  • 处理短序列(总共少于1000个元素)

  • 您可以使用Flowable例如:
  • 冷和非定时源
  • 生成器,类似于源
  • 网络和数据库访问器
  • 08-06 10:22