本文介绍了RxJava 在多个订阅者之间共享一个 Observable 的发射的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下问题:

我有一个 observable 正在做一些工作,但其他 observable 需要该 observable 的输出才能工作.我尝试多次订阅同一个 observable,但在日志中我看到原始 observable 被多次启动.

I have a observable that is doing some work, but other observables need the output of that observable to work. I have tried to subscribe several times on the same observable, but inside the log i see that the original observable is initiated multiple times.

这就是我创建对象的可观察对象:

thats my observable thats create the object:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
            if (mApi == null) {
                //do some work
            }
            subscriber.onNext(mApi);
            subscriber.unsubscribe();
        })

这就是我需要对象的 observable

thats my observable that needs the object

loadApi().flatMap(api -> api....()));

我正在使用

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
                .unsubscribeOn(Schedulers.io()

在所有可观察对象上.

推荐答案

我不确定我是否正确理解了您的问题,但我认为您正在寻找一种方法来在多个订阅者之间共享 observable 的排放.有几种方法可以做到这一点.一方面,您可以像这样使用 Connectable Observable:

I'm not sure that I understood your question correctly, but I figure you're looking for a way to share the emissions of an observable between several subscribers. There are several ways of doing this. For one, you could use a Connectable Observable like so:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub A: " + item));
obs.subscribe(item -> System.out.println("Sub B: " + item));
obs.connect(); //Now the source observable starts emitting items

输出:

Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3

或者,您可以使用 PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub B: " + item));
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable

输出:

Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3

这两个示例都是单线程的,但您可以轻松添加 observeOn 或 subscirbeOn 调用以使其异步.

Both of these examples are single threaded, but you can easily add observeOn or subscirbeOn calls to make them async.

这篇关于RxJava 在多个订阅者之间共享一个 Observable 的发射的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 04:28