我有一个我作为单例持有的以下类(class):
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
在 Activity 中,我观察 session 并对每个更改执行网络操作:
private Subscription init() {
return sessionStore
.observe()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribe(...);
}
当我订阅 session 存储时,主题立即在
io()
上发出,因为它是 BehaviourSubject
并且订阅者在 mainThread()
上执行。当我已经订阅了
sessionStore.set(new AnotherSession())
时,问题就出现了。 IMO 这应该在 init()
调度程序上执行 io()
中定义的流。然而,发生的情况是该流在调用 subject.onNext()
的同一线程上执行。当我在 NetworkOnMainThreadException
中进行网络操作时,结果为 flatMap()
。我是否理解错误?我会以这种方式滥用它们吗?那么正确的解决方案是什么?
我还尝试在
Observable.fromEmitter()
方法中用 observe()
替换整个主题方法,但令人惊讶的是输出完全相同。 最佳答案
请查看《Reactive Programming with RxJava》一书中的以下部分
默认情况下,在 Subject 上调用 onNext() 会直接传播到所有 Observer 的 onNext() 回调方法。这些方法共享相同的名称并不奇怪。在某种程度上,在 Subject 上调用 onNext() 间接地在每个订阅者上调用 onNext()。
让我们回顾一下:
如果您对来自 Thread-1 的 Subject 调用 onNext,它将调用来自 Thread-1 的订阅者的 onNext。 onSubscribe 将被丢弃。
所以首先要做的是:
订阅将在哪个线程上发生:
retrofitService.getAThing()
我只是猜测,并说它是调用线程。这将是observeOn 中描述的线程,也就是Android-UI-Loop。
根据调度程序的指定,observeOn 下的每个值都将从 Thread-a 转移到 Thread-b。在 UI-Loop 上的 observeOn 应该在订阅之前发生。将在订阅中接收的每个值都将在 UI-Loop 上,这不会阻塞 UI 线程或以异常结束。
Pease 看看示例代码和输出:
class SessionStore {
private Subject<String, String> subject;
public SessionStore() {
subject = BehaviorSubject.create("wurst").toSerialized();
}
public void set(String session) {
subject.onNext(session);
}
public Observable<String> observe() {
return subject
.asObservable()
.doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
.distinctUntilChanged();
}
}
@Test
public void name() throws Exception {
// init
SessionStore sessionStore = new SessionStore();
TestSubscriber testSubscriber = new TestSubscriber();
Subscription subscribe = sessionStore
.observe()
.flatMap(s -> {
return Observable.fromCallable(() -> {
System.out.println("flatMap Thread:: " + Thread.currentThread());
return s;
}).subscribeOn(Schedulers.io());
})
.doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
.observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
.subscribe(testSubscriber); // Do UI-Stuff in subscribe
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("123");
}).start();
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("345");
}).start();
boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);
Assert.assertTrue(b);
}
输出::
Receiving value on Thread:: Thread[main,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
set on Thread:: Thread[Thread-1,5,main]
set on Thread:: Thread[Thread-0,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
关于android - RxJava 主题在不正确的调度程序上发出,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40459689/