问题描述
我有一些并行执行的 observable,例如 localObservable
和 networkObservable
.如果 networkObservable
开始发出项目(从这个时候开始,我只需要这些项目),然后丢弃由 localObservable
发出的项目(也许 localObservable
没有还没开始).
I have some set of observable which I am executing in parallel, like localObservable
and networkObservable
. If the networkObservable
starts emit items (from this time, I need only these items), then discard items emitted by localObservable
(maybe localObservable
has not yet started).
Observable<Integer> localObservable =
Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());
推荐答案
你可以这样做:
Observable<Long> networkObservable =
Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.share();
Observable<Long> localObservable =
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.takeUntil(networkObservable);
Observable.merge(networkObservable, localObservable)
.subscribe(System.out::println);
这将输出:
0 // localObservable
1 // localObservable
0 // networkObservable from here on
1
2
...
takeUntil
将使 localObservable
在 networkObservable
的第一次发射发生时停止并取消订阅,因此合并的 Observable
> 将从 localObservable
发出,只要 networkObservable
没有启动,当它启动时,它会停止从 localObservable
发出并切换到只发出来自 networkObservable
.
takeUntil
will make localObservable
to stop and unsubscribe when the first emission from networkObservable
happened, so the merged Observable
will emit from localObservable
as long networkObservable
didn't started, and when it does, it will stop emitting from localObservable
and switch to emit only from networkObservable
.
这篇关于如果第二个 observable start 发出项目,rxjava 切换 observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!