本文介绍了如果第二个 observable start 发出项目,rxjava 切换 observable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些并行执行的 observable,例如 localObservablenetworkObservable.如果 networkObservable 开始发出项目(从这个时候开始,我只需要这些项目),然后丢弃由 localObservable 发出的项目(也许 localObservable 没有还没开始).

I have some set of observable which I am executing in parallel, like localObservable and networkObservable. If the networkObservable starts emit items (from this time, I need only these items), then discard items emitted by localObservable (maybe localObservable has not yet started).

Observable<Integer> localObservable =
            Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
            Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());

推荐答案

你可以这样做:

 Observable<Long> networkObservable =
            Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.io())
                    .share();
    Observable<Long> localObservable =
            Observable.interval(500, TimeUnit.MILLISECONDS)                       
                    .subscribeOn(Schedulers.io())
                    .takeUntil(networkObservable);

    Observable.merge(networkObservable, localObservable)
            .subscribe(System.out::println);

这将输出:

0 // localObservable 
1 // localObservable 
0 // networkObservable from here on
1
2
...

takeUntil 将使 localObservablenetworkObservable 的第一次发射发生时停止并取消订阅,因此合并的 Observable> 将从 localObservable 发出,只要 networkObservable 没有启动,当它启动时,它会停止从 localObservable 发出并切换到只发出来自 networkObservable.

takeUntil will make localObservable to stop and unsubscribe when the first emission from networkObservable happened, so the merged Observable will emit from localObservable as long networkObservable didn't started, and when it does, it will stop emitting from localObservable and switch to emit only from networkObservable.

这篇关于如果第二个 observable start 发出项目,rxjava 切换 observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 10:13