我有两个Observable
,我们称它们为PeanutButter
和Jelly
。我想将它们组合为Sandwich
Observable
。我可以使用:
Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
peanutButterObservable,
jellyObservable,
(pb, j) -> makeSandwich(pb, j))
问题是RX在发出第一个组合的
PeanutButter
之前先等待第一个Jelly
和第一个Sandwich
发出,但是Jelly
可能永远不会被发出,这意味着我永远也不会得到第一个Sandwich
。我想将这两个提要合并在一起,以便从任何一个提要中发出第一个项目时就发出一个合并的项目,而无论另一个提要是否尚未发出任何东西,我该如何在RxJava中做到这一点?
最佳答案
一种可能的方法是在订阅时使用startWith
运算符从每个流中触发已知值的发射。这样,如果其中一个流发出值,combineLatest()
将触发。您只需要注意onNext
使用者中的初始/信号值即可。
像这样的东西...:
@Test
public void sandwiches() {
final Observable<String> peanutButters = Observable.just("chunky", "smooth")
.startWith("--initial--");
final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
.startWith("--initial--");
Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
return new Pair<>(peanutButter, jelly);
})
.subscribe(
next -> {
final String peanutButter = next.getFirst();
final String jelly = next.getSecond();
if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
// initial emissions
} else if(peanutButter.equals("--initial--")) {
// jelly emission
} else if(jelly.equals("--initial--")) {
// peanut butter emission
} else {
// peanut butter + jelly emissions
}
},
error -> {
System.err.println("## onError(" + error.getMessage() + ")");
},
() -> {
System.out.println("## onComplete()");
}
);
}