RxJava2实战---第七章 合并操作符和连接操作符
RxJava的合并操作符:
- startWith():在数据序列的开头增加一项数据。
- merge:将多个Observable合并为一个。
- mergeDelayError():合并多个Observable,让没有错误的Observable都完成后再发射错误通知。
- zip():使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。
- combineLatest():当两个Observable中的任何一个发射一个数据时,通知一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后在发射这个函数的结果。
- join(): 无论何时,如果一个Observable发射了一个数据项,就需要在另一个Observable发射的数据项定义的时间窗口内,将两个Observable发射的数据合并发射。
- switchOnNext():将一个发射Observable的Observable转换成另一个Observable,后者发射这些Observable最近发射的数据。
RxJava的连续操作符,主要是ConnectableObservable所使用的操作符和Observable所使用的操作符。
- ConnectableObservable.connect():指示一个可连续的Observable开始发射数据项。
- Observable.publish():将一个Observable转换为一个可连续的Observable.
- Observable.replay():确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。
- ConnectableObservable.reCount():让一个可连续的Observable表现得像一个普通的Observable。
1. 合并操作符
1.1 startWith
Observable.just("Hello Java","Hello Kotlin","Hello Android")
.startWith("Hello Rx")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
//Lamand表达式
Observable.just("Hello Java","Hello Kotlin","Hello Android")
.startWith("Hello Rx")
.subscribe(s -> System.out.println(s));
执行结果:
Hello Rx
Hello Java
Hello Kotlin
Hello Android
Observable.just("Hello Java","Hello Kotlin","Hello Android")
.startWithArray("Hello Rx","Hello Android")
.startWith("Hello World")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
//Lamand表达式
Observable.just("Hello Java","Hello Kotlin","Hello Android")
.startWithArray("Hello Rx","Hello RN")
.startWith("Hello World")
.subscribe(s -> System.out.println(s));
执行结果:
Hello World
Hello Rx
Hello Android
Hello Java
Hello Kotlin
Hello Android
Observable.just("Hello Java","Hello Kotlin","Hello Android")
.startWithArray("Hello Rx","Hello Android")
.startWith(Observable.just("HellowWorld"))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
//Lamand表达式
Observable.just("Hello Java","Hello Kotlin","Hello Android")
.startWithArray("Hello Rx","Hello RN")
.startWith(Observable.just("HellowWorld"))
.subscribe(s -> System.out.println(s));
执行结果:
HellowWorld
Hello Rx
Hello Android
Hello Java
Hello Kotlin
Hello Android
1.2 concat/concatArray
Observable.concatArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
//lamand表达式
Observable.concatArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
.subscribe(integer -> System.out.println(integer));
执行结果:
1
2
3
4
5
6
1.3 concatArrayDelayError/mergeArrayDelayError
Observable
.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new NullPointerException());
emitter.onNext(3);
}
}),
Observable.just(4,5,6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
System.out.println(throwable.getMessage());
}
});
执行结果:
1
2
4
5
6
java.lang.NullPointerException
at com.loan.rxjavademo.RxjavaTest$3.subscribe(RxjavaTest.java:36)
null
//lamand表达式
Observable.concatArrayDelayError(
Observable.create(emitter -> {
emitter.onNext(1);
emitter.onError(new NullPointerException());
emitter.onNext(2);
emitter.onNext(3);
}),Observable.just(4,5,6))
.subscribe(integer -> System.out.println(integer),throwable -> {
throwable.printStackTrace();
System.out.println(throwable.getMessage());
});
执行结果:
1
4
5
6
java.lang.NullPointerException
null
1.4 merge/mergeArray
Observable.mergeArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
//lamand表达式
Observable.mergeArray(Observable.intervalRange(1,3,0,1,TimeUnit.SECONDS)
,Observable.intervalRange(4,3,0,1,TimeUnit.SECONDS))
.subscribe(integer -> System.out.println(integer));
执行结果:
1
4
2
5
3
6
1.5 mergeWith
Observable.just(1,2,3)
.mergeWith(Observable.just(4,5,6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
执行结果:
1
2
3
4
5
6
1.6 zip
Observable.zip(Observable.just(1, 3, 5), Observable.just(2, 4, 6),
new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
//Lamand表达式
Observable.zip(Observable.just(1,3,5),Observable.just(2,4,6,8,10),
(integer, integer2) -> integer+integer2)
.subscribe(integer -> System.out.println(integer));
执行结果:
3
7
11
1.7 combineLatest
Observable.combineLatest(Observable.just(1, 3, 5), Observable.just(2, 4, 6,8,10),
new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
System.out.println("integer:"+integer+" integer2:"+integer2);
return integer+integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
//Lamand表达式
Observable.combineLatest(Observable.just(1,3,5),Observable.just(2,4,6,8,10),
(integer, integer2) -> integer+integer2)
.subscribe(integer -> System.out.println(integer));
执行结果:
integer:5 integer2:2
7
integer:5 integer2:4
9
integer:5 integer2:6
11
integer:5 integer2:8
13
integer:5 integer2:10
15
1.8 join
join()四个参数的用途:
- Observable:源Observable需要组合的Observable,这里可以称之为目标Observable.
- Function:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了源Observable发射数据的有效期。
- FUnction:接受目标Observable发射来的数据,并返回一个Observable,这个Observable的生命周期决定了目标Observable发射数据的有效期。
- BiFunction:接受从源Observable和目标Observable发射的数据,并将这两个数据祝贺后返回。
Observable<Integer> o1=Observable.just(1,2,3);
Observable<Integer> o2=Observable.just(4,5,6);
o1.join(o2, new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
System.out.println("apply1:"+integer);
return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
}
}, new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
System.out.println("apply2:"+integer);
return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
}
}, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
System.out.println("apply3: integer:"+integer+" intteger2:"+integer2);
return integer+":"+integer2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
执行结果:
apply1:1
apply1:2
apply1:3
apply2:4
apply3: integer:1 intteger2:4
1:4
apply3: integer:2 intteger2:4
2:4
apply3: integer:3 intteger2:4
3:4
apply2:5
apply3: integer:1 intteger2:5
1:5
apply3: integer:2 intteger2:5
2:5
apply3: integer:3 intteger2:5
3:5
apply2:6
apply3: integer:1 intteger2:6
1:6
apply3: integer:2 intteger2:6
2:6
apply3: integer:3 intteger2:6
3:6
对上述代码做一点修改:
Observable<Integer> o1=Observable.just(1,2,3).delay(200,TimeUnit.MILLISECONDS);
Observable<Integer> o2=Observable.just(4,5,6);
o1.join(o2, new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
System.out.println("apply1:"+integer);
return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
}
}, new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
System.out.println("apply2:"+integer);
return Observable.just(integer+"").delay(200,TimeUnit.MILLISECONDS);
}
}, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
System.out.println("apply3: integer:"+integer+" intteger2:"+integer2);
return integer+":"+integer2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
执行结果:
apply2:4
apply2:5
apply2:6
apply1:1
apply3: integer:1 intteger2:4
1:4
apply3: integer:1 intteger2:5
1:5
apply3: integer:1 intteger2:6
1:6
apply1:2
apply3: integer:2 intteger2:4
2:4
apply3: integer:2 intteger2:5
2:5
apply3: integer:2 intteger2:6
2:6
apply1:3
apply3: integer:3 intteger2:4
3:4
apply3: integer:3 intteger2:5
3:5
apply3: integer:3 intteger2:6
3:6
1.9 reduce
示例:累加计算
Observable.just(1,2,3,4,5)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
//Lamanda表达式
Observable.just(1,2,3,4,5)
.reduce((integer, integer2) -> integer+integer2)
.subscribe(integer -> System.out.println(integer));
执行结果:
15
1.10 collect
Observable.just(1,2,3,4,5)
.collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() throws Exception {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new BiConsumer<List<Integer>, Throwable>() {
@Override
public void accept(List<Integer> integers, Throwable throwable) throws Exception {
System.out.println(integers);
}
});
执行结果:
[1, 2, 3, 4, 5]
2. 连接操作符
2.1 connect
2.2 publish
SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");
Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);
ConnectableObservable<Long> connectableObservable=observable.publish();
connectableObservable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println("subscriber1:onNext:"+aLong+" time:"+sdf.format(new Date()));
}
@Override
public void onError(Throwable e) {
System.out.println("subscriber1:error:"+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscriber1:onComplete");
}
});
connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println("subscriber2:onNext:"+aLong+" time:"+sdf.format(new Date()));
}
@Override
public void onError(Throwable e) {
System.out.println("subscriber2:error:"+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscriber2:onComplete");
}
});
connectableObservable.connect();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
执行结果:
subscriber1:onNext:2 time:16:25:31
subscriber1:onNext:3 time:16:25:32
subscriber2:onNext:3 time:16:25:32
subscriber1:onNext:4 time:16:25:33
subscriber2:onNext:4 time:16:25:33
subscriber1:onNext:5 time:16:25:34
subscriber2:onNext:5 time:16:25:34
subscriber1:onComplete
subscriber2:onComplete
2.3 replay
SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");
Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);
ConnectableObservable<Long> connectableObservable=observable.replay();
connectableObservable.connect();
connectableObservable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println("subscriber1:onNext:"+aLong+" time:"+sdf.format(new Date()));
}
@Override
public void onError(Throwable e) {
System.out.println("subscriber1:error:"+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscriber1:onComplete");
}
});
connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println("subscriber2:onNext:"+aLong+" time:"+sdf.format(new Date()));
}
@Override
public void onError(Throwable e) {
System.out.println("subscriber2:error:"+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscriber2:onComplete");
}
});
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
执行结果:
subscriber1:onNext:0 time:16:39:16
subscriber1:onNext:1 time:16:39:17
subscriber1:onNext:2 time:16:39:18
subscriber2:onNext:0 time:16:39:18
subscriber2:onNext:1 time:16:39:18
subscriber2:onNext:2 time:16:39:18
subscriber1:onNext:3 time:16:39:19
subscriber2:onNext:3 time:16:39:19
subscriber1:onNext:4 time:16:39:20
subscriber2:onNext:4 time:16:39:20
subscriber1:onNext:5 time:16:39:21
subscriber2:onNext:5 time:16:39:21
subscriber1:onComplete
subscriber2:onComplete
2.4 refCount
SimpleDateFormat sdf=new SimpleDateFormat("HH:mm:ss");
Observable observable=Observable.interval(1,TimeUnit.SECONDS).take(6);
ConnectableObservable<Long> connectableObservable=observable.publish();
Observable obsRefCount=connectableObservable.refCount();
observable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println("subscriber1:onNext:"+aLong+" time:"+sdf.format(new Date()));
}
@Override
public void onError(Throwable e) {
System.out.println("subscriber1:error:"+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscriber1:onComplete");
}
});
observable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println("subscriber2:onNext:"+aLong+" time:"+sdf.format(new Date()));
}
@Override
public void onError(Throwable e) {
System.out.println("subscriber2:error:"+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscriber2:onComplete");
}
});
obsRefCount.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println("subscriber3:onNext:"+aLong+" time:"+sdf.format(new Date()));
}
@Override
public void onError(Throwable e) {
System.out.println("subscriber3:error:"+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscriber3:onComplete");
}
});
obsRefCount.delaySubscription(3, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.println("subscriber4:onNext:"+aLong+" time:"+sdf.format(new Date()));
}
@Override
public void onError(Throwable e) {
System.out.println("subscriber4:error:"+e.getMessage());
}
@Override
public void onComplete() {
System.out.println("subscriber4:onComplete");
}
});
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
执行结果:
subscriber1:onNext:0 time:16:30:30
subscriber2:onNext:0 time:16:30:30
subscriber3:onNext:0 time:16:30:30
subscriber2:onNext:1 time:16:30:31
subscriber1:onNext:1 time:16:30:31
subscriber3:onNext:1 time:16:30:31
subscriber1:onNext:2 time:16:30:32
subscriber2:onNext:2 time:16:30:32
subscriber3:onNext:2 time:16:30:32
subscriber1:onNext:3 time:16:30:33
subscriber2:onNext:3 time:16:30:33
subscriber3:onNext:3 time:16:30:33
subscriber4:onNext:3 time:16:30:33
subscriber1:onNext:4 time:16:30:34
subscriber2:onNext:4 time:16:30:34
subscriber3:onNext:4 time:16:30:34
subscriber4:onNext:4 time:16:30:34
subscriber2:onNext:5 time:16:30:35
subscriber1:onNext:5 time:16:30:35
subscriber1:onComplete
subscriber2:onComplete
subscriber3:onNext:5 time:16:30:35
subscriber4:onNext:5 time:16:30:35
subscriber3:onComplete
subscriber4:onComplete