RxJava2实战---第七章 合并操作符和连接操作符

RxJava的合并操作符:

  • startWith():在数据序列的开头增加一项数据。
  • merge:将多个Observable合并为一个。
  • mergeDelayError():合并多个Observable,让没有错误的Observable都完成后再发射错误通知。
  • zip():使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。
  • combineLatest():当两个Observable中的任何一个发射一个数据时,通知一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后在发射这个函数的结果。
  • join(): 无论何时,如果一个Observable发射了一个数据项,就需要在另一个Observable发射的数据项定义的时间窗口内,将两个Observable发射的数据合并发射。
  • switchOnNext():将一个发射Observable的Observable转换成另一个Observable,后者发射这些Observable最近发射的数据。

RxJava的连续操作符,主要是ConnectableObservable所使用的操作符和Observable所使用的操作符。

  • ConnectableObservable.connect():指示一个可连续的Observable开始发射数据项。
  • Observable.publish():将一个Observable转换为一个可连续的Observable.
  • Observable.replay():确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。
  • ConnectableObservable.reCount():让一个可连续的Observable表现得像一个普通的Observable。

1. 合并操作符

1.1 startWith

        Observable.just("Hello Java","Hello Kotlin","Hello Android")
                .startWith("Hello Rx")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

        //Lamand表达式
        Observable.just("Hello Java","Hello Kotlin","Hello Android")
                .startWith("Hello Rx")
                .subscribe(s -> System.out.println(s));

    执行结果:
    Hello Rx
    Hello Java
    Hello Kotlin
    Hello Android
        Observable.just("Hello Java","Hello Kotlin","Hello Android")
                .startWithArray("Hello Rx","Hello Android")
                .startWith("Hello World")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

        //Lamand表达式
        Observable.just("Hello Java","Hello Kotlin","Hello Android")
                .startWithArray("Hello Rx","Hello RN")
                .startWith("Hello World")
                .subscribe(s -> System.out.println(s));

    执行结果:
    Hello World
    Hello Rx
    Hello Android
    Hello Java
    Hello Kotlin
    Hello Android
        Observable.just("Hello Java","Hello Kotlin","Hello Android")
                .startWithArray("Hello Rx","Hello Android")
                .startWith(Observable.just("HellowWorld"))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

        //Lamand表达式
        Observable.just("Hello Java","Hello Kotlin","Hello Android")
                .startWithArray("Hello Rx","Hello RN")
                .startWith(Observable.just("HellowWorld"))
                .subscribe(s -> System.out.println(s));

    执行结果:
    HellowWorld
    Hello Rx
    Hello Android
    Hello Java
    Hello Kotlin
    Hello Android

1.2 concat/concatArray

        Observable.concatArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
                ,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });

        //lamand表达式
        Observable.concatArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
                ,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
                .subscribe(integer -> System.out.println(integer));

    执行结果:
    1
    2
    3
    4
    5
    6

1.3 concatArrayDelayError/mergeArrayDelayError

        Observable
                .concatArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            emitter.onNext(1);
                            emitter.onNext(2);
                            emitter.onError(new NullPointerException());
                            emitter.onNext(3);
                        }
                    }),
                Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        throwable.printStackTrace();
                        System.out.println(throwable.getMessage());
                    }
                });

执行结果:
    1
    2
    4
    5
    6
    java.lang.NullPointerException
        at com.loan.rxjavademo.RxjavaTest$3.subscribe(RxjavaTest.java:36)
    null


        //lamand表达式
        Observable.concatArrayDelayError(
                Observable.create(emitter -> {
                    emitter.onNext(1);
                    emitter.onError(new NullPointerException());
                    emitter.onNext(2);
                    emitter.onNext(3);
                }),Observable.just(4,5,6))
                .subscribe(integer -> System.out.println(integer),throwable -> {
                    throwable.printStackTrace();
                    System.out.println(throwable.getMessage());
                });

    执行结果:
    1
    4
    5
    6
    java.lang.NullPointerException
    null

1.4 merge/mergeArray

        Observable.mergeArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
                ,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });


        //lamand表达式
         Observable.mergeArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
                ,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
                .subscribe(integer -> System.out.println(integer));

        执行结果:
        1
        4
        2
        5
        3
        6

1.5 mergeWith

        Observable.just(1,2,3)
                .mergeWith(Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
        执行结果:
        1
        2
        3
        4
        5
        6

1.6 zip

        Observable.zip(Observable.just(1, 3, 5), Observable.just(2, 4, 6),
                new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer+integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

        //Lamand表达式
        Observable.zip(Observable.just(1,3,5),Observable.just(2,4,6,8,10),
                (integer, integer2) -> integer+integer2)
                .subscribe(integer -> System.out.println(integer));

        执行结果:
        3
        7
        11

1.7 combineLatest

        Observable.combineLatest(Observable.just(1, 3, 5), Observable.just(2, 4, 6,8,10),
                new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        System.out.println("integer:"+integer+"   integer2:"+integer2);
                        return integer+integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

        //Lamand表达式
        Observable.combineLatest(Observable.just(1,3,5),Observable.just(2,4,6,8,10),
                (integer, integer2) -> integer+integer2)
                .subscribe(integer -> System.out.println(integer));

    执行结果:
    integer:5   integer2:2
    7
    integer:5   integer2:4
    9
    integer:5   integer2:6
    11
    integer:5   integer2:8
    13
    integer:5   integer2:10
    15

1.8 join

join()四个参数的用途:

  • Observable:源Observable需要组合的Observable,这里可以称之为目标Observable.
  • Function:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射数据的有效期。
  • FUnction:接受目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射数据的有效期。
  • BiFunction:接受从源Observable和目标Observable发射的数据,并将这两个数据祝贺后返回。
        Observable<Integer> o1=Observable.just(1,2,3);
        Observable<Integer> o2=Observable.just(4,5,6);

        o1.join(o2, new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                System.out.println("apply1:"+integer);
                return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
            }
        }, new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                System.out.println("apply2:"+integer);
                return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
            }
        }, new BiFunction<Integer, Integer, String>() {
            @Override
            public String apply(Integer integer, Integer integer2) throws Exception {
                System.out.println("apply3:  integer:"+integer+"  intteger2:"+integer2);
                return integer+":"+integer2;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });


        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        执行结果:
        apply1:1
        apply1:2
        apply1:3
        apply2:4
        apply3:  integer:1  intteger2:4
        1:4
        apply3:  integer:2  intteger2:4
        2:4
        apply3:  integer:3  intteger2:4
        3:4
        apply2:5
        apply3:  integer:1  intteger2:5
        1:5
        apply3:  integer:2  intteger2:5
        2:5
        apply3:  integer:3  intteger2:5
        3:5
        apply2:6
        apply3:  integer:1  intteger2:6
        1:6
        apply3:  integer:2  intteger2:6
        2:6
        apply3:  integer:3  intteger2:6
        3:6

对上述代码做一点修改:

        Observable<Integer> o1=Observable.just(1,2,3).delay(200,TimeUnit.MILLISECONDS);
        Observable<Integer> o2=Observable.just(4,5,6);

        o1.join(o2, new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                System.out.println("apply1:"+integer);
                return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
            }
        }, new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                System.out.println("apply2:"+integer);
                return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
            }
        }, new BiFunction<Integer, Integer, String>() {
            @Override
            public String apply(Integer integer, Integer integer2) throws Exception {
                System.out.println("apply3:  integer:"+integer+"  intteger2:"+integer2);
                return integer+":"+integer2;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });


        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        执行结果:
        apply2:4
        apply2:5
        apply2:6
        apply1:1
        apply3:  integer:1  intteger2:4
        1:4
        apply3:  integer:1  intteger2:5
        1:5
        apply3:  integer:1  intteger2:6
        1:6
        apply1:2
        apply3:  integer:2  intteger2:4
        2:4
        apply3:  integer:2  intteger2:5
        2:5
        apply3:  integer:2  intteger2:6
        2:6
        apply1:3
        apply3:  integer:3  intteger2:4
        3:4
        apply3:  integer:3  intteger2:5
        3:5
        apply3:  integer:3  intteger2:6
        3:6

1.9 reduce

示例:累加计算

        Observable.just(1,2,3,4,5)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer+integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });


        //Lamanda表达式
        Observable.just(1,2,3,4,5)
                .reduce((integer, integer2) -> integer+integer2)
                .subscribe(integer -> System.out.println(integer));

        执行结果:
        15

1.10 collect

        Observable.just(1,2,3,4,5)
                .collect(new Callable<List<Integer>>() {

                    @Override
                    public List<Integer> call() throws Exception {
                        return new ArrayList<Integer>();
                    }
                }, new BiConsumer<List<Integer>, Integer>() {
                    @Override
                    public void accept(List<Integer> integers, Integer integer) throws Exception {
                        integers.add(integer);
                    }
                })
                .subscribe(new BiConsumer<List<Integer>, Throwable>() {
                    @Override
                    public void accept(List<Integer> integers, Throwable throwable) throws Exception {
                        System.out.println(integers);
                    }
                });


           执行结果:
           [1, 2, 3, 4, 5]

2. 连接操作符

2.1 connect

2.2 publish


        SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");

        Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);

        ConnectableObservable<Long> connectableObservable=observable.publish();

        connectableObservable.subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("subscriber1:onNext:"+aLong+"  time:"+sdf.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("subscriber1:error:"+e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("subscriber1:onComplete");
            }
        });

        connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("subscriber2:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("subscriber2:error:"+e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("subscriber2:onComplete");
                    }
                });

        connectableObservable.connect();



        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    执行结果:
    subscriber1:onNext:2  time:16:25:31
    subscriber1:onNext:3  time:16:25:32
    subscriber2:onNext:3  time:16:25:32
    subscriber1:onNext:4  time:16:25:33
    subscriber2:onNext:4  time:16:25:33
    subscriber1:onNext:5  time:16:25:34
    subscriber2:onNext:5  time:16:25:34
    subscriber1:onComplete
    subscriber2:onComplete

2.3 replay

        SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");

        Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);

        ConnectableObservable<Long> connectableObservable=observable.replay();
        connectableObservable.connect();

        connectableObservable.subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("subscriber1:onNext:"+aLong+"  time:"+sdf.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("subscriber1:error:"+e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("subscriber1:onComplete");
            }
        });



        connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("subscriber2:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("subscriber2:error:"+e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("subscriber2:onComplete");
                    }
                });


        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        执行结果:
        subscriber1:onNext:0  time:16:39:16
        subscriber1:onNext:1  time:16:39:17
        subscriber1:onNext:2  time:16:39:18
        subscriber2:onNext:0  time:16:39:18
        subscriber2:onNext:1  time:16:39:18
        subscriber2:onNext:2  time:16:39:18
        subscriber1:onNext:3  time:16:39:19
        subscriber2:onNext:3  time:16:39:19
        subscriber1:onNext:4  time:16:39:20
        subscriber2:onNext:4  time:16:39:20
        subscriber1:onNext:5  time:16:39:21
        subscriber2:onNext:5  time:16:39:21
        subscriber1:onComplete
        subscriber2:onComplete

2.4 refCount

        SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");

        Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);

        ConnectableObservable<Long> connectableObservable=observable.publish();

        Observable obsRefCount=connectableObservable.refCount();

        observable.subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("subscriber1:onNext:"+aLong+"  time:"+sdf.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("subscriber1:error:"+e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("subscriber1:onComplete");
            }
        });
        observable.subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("subscriber2:onNext:"+aLong+"  time:"+sdf.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("subscriber2:error:"+e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("subscriber2:onComplete");
            }
        });
        obsRefCount.subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("subscriber3:onNext:"+aLong+"  time:"+sdf.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("subscriber3:error:"+e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("subscriber3:onComplete");
            }
        });

        obsRefCount.delaySubscription(3, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("subscriber4:onNext:"+aLong+"  time:"+sdf.format(new Date()));
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("subscriber4:error:"+e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("subscriber4:onComplete");
                    }
                });


        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        执行结果:
        subscriber1:onNext:0  time:16:30:30
        subscriber2:onNext:0  time:16:30:30
        subscriber3:onNext:0  time:16:30:30
        subscriber2:onNext:1  time:16:30:31
        subscriber1:onNext:1  time:16:30:31
        subscriber3:onNext:1  time:16:30:31
        subscriber1:onNext:2  time:16:30:32
        subscriber2:onNext:2  time:16:30:32
        subscriber3:onNext:2  time:16:30:32
        subscriber1:onNext:3  time:16:30:33
        subscriber2:onNext:3  time:16:30:33
        subscriber3:onNext:3  time:16:30:33
        subscriber4:onNext:3  time:16:30:33
        subscriber1:onNext:4  time:16:30:34
        subscriber2:onNext:4  time:16:30:34
        subscriber3:onNext:4  time:16:30:34
        subscriber4:onNext:4  time:16:30:34
        subscriber2:onNext:5  time:16:30:35
        subscriber1:onNext:5  time:16:30:35
        subscriber1:onComplete
        subscriber2:onComplete
        subscriber3:onNext:5  time:16:30:35
        subscriber4:onNext:5  time:16:30:35
        subscriber3:onComplete
        subscriber4:onComplete
01-05 06:00