我有两个Observable,我们称它们为PeanutButterJelly。我想将它们组合为Sandwich Observable。我可以使用:

Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
    peanutButterObservable,
    jellyObservable,
    (pb, j) -> makeSandwich(pb, j))

问题是RX在发出第一个组合的PeanutButter之前先等待第一个Jelly和第一个Sandwich发出,但是Jelly可能永远不会被发出,这意味着我永远也不会得到第一个Sandwich

我想将这两个提要合并在一起,以便从任何一个提要中发出第一个项目时就发出一个合并的项目,而无论另一个提要是否尚未发出任何东西,我该如何在RxJava中做到这一点?

最佳答案

一种可能的方法是在订阅时使用startWith运算符从每个流中触发已知值的发射。这样,如果其中一个流发出值,combineLatest()将触发。您只需要注意onNext使用者中的初始/信号值即可。

像这样的东西...:

@Test
public void sandwiches() {
    final Observable<String> peanutButters = Observable.just("chunky", "smooth")
        .startWith("--initial--");

    final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
        .startWith("--initial--");

    Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
        return new Pair<>(peanutButter, jelly);
    })
    .subscribe(
        next -> {
            final String peanutButter = next.getFirst();
            final String jelly = next.getSecond();

            if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
                // initial emissions
            } else if(peanutButter.equals("--initial--")) {
                // jelly emission
            } else if(jelly.equals("--initial--")) {
                // peanut butter emission
            } else {
                // peanut butter + jelly emissions
            }
        },
        error -> {
            System.err.println("## onError(" + error.getMessage() + ")");
        },
        () -> {
            System.out.println("## onComplete()");
        }
    );
}

07-24 09:19