RxJava2实战--第八章 RxJava的背压

1 背压

产生条件:

  1. 异步,被观察者和观察者处于不同的线程中。
  2. 被观察者发送消息的速度远远快于观察者处理的的数据

解决背压问题的方法:

1.1 过滤限流

  • sample:在一段时间内,只处理最后一个数据。
  • throttleFirst:在一段时间内,只处理第一个数据。
  • debounce:发送一个数据,开始计时,到了规定时间,若没有再发送数据,则开始处理数据,反之重新开始计时。

1.2 打包缓存

  • buffer:将多个事件打包放入一个List中,再一起发射。
  • window:将多个事件打包放入一个Observable中,再一起发射。

1.3 使用背压操作符

2. RxJava2.x的背压策略

2.1 MISSING

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i <1000 ; i++) {
                    emitter.onNext(i);
                }
            }
        },BackpressureStrategy.MISSING)
                .onBackpressureBuffer()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

上面的代码执行的是buffer的背压策略。

2.2 ERROR

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 129; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

在Android中运行以上代码,会立即引起App Crash,引起以下Exception:

W/System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:438)
        at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:406)

因为Flowable的默认队列是128,所以讲上述代码的129改成128,程序就可以正常运行了。

2.3 BUFFER

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0;; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

上述代码不会导致崩溃,但会引起ANR。

2.4 DROR

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0;i<129; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

在Android中运行这段代码,不会引起Crash,但只会打印出0~127,第128则被丢弃,因为Flowable的内部队列已经满了。

2.5 LATEST

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0;i<1000; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

执行结果:

0
1
...
127
516
519
521
523
526
528
530
...
734
737
740
743
745
748
999

3. 其他

示例:

Flowable.interval(1, TimeUnit.MILLISECONDS)
        .onBackpressureBuffer()
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println(aLong);
            }
        });
01-08 17:54