问题描述
我有以下问题:
我有一个 observable 正在做一些工作,但其他 observable 需要该 observable 的输出才能工作.我尝试多次订阅同一个 observable,但在日志中我看到原始 observable 被多次启动.
I have a observable that is doing some work, but other observables need the output of that observable to work. I have tried to subscribe several times on the same observable, but inside the log i see that the original observable is initiated multiple times.
这就是我创建对象的可观察对象:
thats my observable thats create the object:
Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
if (mApi == null) {
//do some work
}
subscriber.onNext(mApi);
subscriber.unsubscribe();
})
这就是我需要对象的 observable
thats my observable that needs the object
loadApi().flatMap(api -> api....()));
我正在使用
.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(Schedulers.io()
在所有可观察对象上.
推荐答案
我不确定我是否正确理解了您的问题,但我认为您正在寻找一种方法来在多个订阅者之间共享 observable 的排放.有几种方法可以做到这一点.一方面,您可以像这样使用 Connectable Observable:
I'm not sure that I understood your question correctly, but I figure you're looking for a way to share the emissions of an observable between several subscribers. There are several ways of doing this. For one, you could use a Connectable Observable like so:
ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub A: " + item));
obs.subscribe(item -> System.out.println("Sub B: " + item));
obs.connect(); //Now the source observable starts emitting items
输出:
Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
或者,您可以使用 PublishSubject:
PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub B: " + item));
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable
输出:
Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
这两个示例都是单线程的,但您可以轻松添加 observeOn 或 subscirbeOn 调用以使其异步.
Both of these examples are single threaded, but you can easily add observeOn or subscirbeOn calls to make them async.
这篇关于RxJava 在多个订阅者之间共享一个 Observable 的发射的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!