所以我有这个密码:
public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) {
return Observable.<AbstractXMPPConnection>create(subscriber -> {
try {
AbstractXMPPConnection connection2 = connection.connect();
if (connection2.isConnected()) {
subscriber.onNext(connection2);
subscriber.onCompleted();
}
} catch (SmackException | IOException | XMPPException e) {
e.printStackTrace();
subscriber.onError(e);
}
})
.doOnError(throwable -> LOGI("111", "Connection OnError called"));
}
public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) {
return connect(connection)
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer))
.flatMap(pair -> {
if (pair.second == MAX_LOGIN_TRIES)
return Observable.error(pair.first);
return Observable.timer(pair.second, TimeUnit.SECONDS);
}));
}
public void connect() {
assertTrue("To start a connection to the server, you must first call init() method!",
this.connectionConfig != null);
connectionHelper.connectWithRetry(connection)
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<AbstractXMPPConnection>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
LOGI(TAG, "ConnectionHelper Connection onError\n");
/**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */
MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent());
}
@Override
public void onNext(AbstractXMPPConnection connection) {
LOGI(TAG, "ConnectionHelper Connection onNext");
// onConnected();
}
});
}
我有一些关于链式观测的问题。想象一下这个场景,在这个场景中,我有一个可观察到的连接,有时我使用,但我主要使用可观察到的连接。
我的问题是,如果a加上这个:
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
对于
connectWithRetry()
和connect()
?在这种情况下,当我打电话给public void连接并指定一个调度程序,以前的那些被忽略吗?
为什么我会得到
connectWithRetry()
?显式的NetworkOnMainThreadException
在那里,它不应该给我那个错误 最佳答案
我会先解决你的问题。NetworkOnMainThread
意味着将在新线程上观察输出—也就是说,订阅服务器(observeOn(Schedulers.newThread())
)中的代码将在该线程上运行。onComplete/Error/Next
意味着订阅将发生在主线程上-创建的observate(subscribeOn(AndroidSchedulers.mainThread()
etc)中的代码是订阅发生时运行的代码。
因此,只需交换调度程序:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
所以为了回答你的第一个问题,它们并没有被忽略,只是被错误地使用了。希望从中可以看到,如果将类似的调用移到返回可观测值的方法内的链中,将会发生什么:与您已经做的没有什么不同。这些电话只会打到另一个地方。
那么在哪里放置调度程序选择呢?这取决于你。您可以通过在创建您的观察值的方法中不使用
connection.connect()
调用来获得更高的清晰度: connectionHelper.connectWithRetry(connection)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
但是,如果您觉得您无缘无故地到处调用它,则可以将
subscribeOn
调用移到您的方法中:return connect(connection)
.retryWhen(...)
.flatMap(...)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
请注意,这些不必像这样捆绑在一起-您可以在方法中
subscribeOn
,但将subscribeOn
留给任何希望在特定线程上获得结果的调用方。