我真的很喜欢rxjava,它是一个很棒的工具,但有时很难理解它是如何工作的。
在我们的android项目中,我们使用了rxjava的reformation,下面是一个用例:
我需要轮询服务器,在服务器执行某些任务时,重试之间会有一些延迟。服务器完成后,我必须交付结果。所以我已经用rxjava成功地做到了,下面是代码片段:
我用“滑雪”和“重复时间”
Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
.skipWhile(new Func1<CheckJobResponse, Boolean>() {
@Override
public Boolean call(CheckJobResponse checkJobResponse) {
boolean shouldSkip = false;
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());
switch (checkJobResponse.getJobStatus()){
case CheckJobResponse.PROCESSING:
shouldSkip = true;
break;
case CheckJobResponse.DONE:
case CheckJobResponse.ERROR:
shouldSkip = false;
break;
}
if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);
return shouldSkip;
}
})
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
return observable.delay(1, TimeUnit.SECONDS);
}
}).subscribe(new Subscriber<CheckJobResponse>(){
@Override
public void onNext(CheckJobResponse response) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);
}
@Override
public void onError(BaseError error) {
if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();
}
@Override
public void onCompleted() {
if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
}
});
代码运行良好:
当服务器响应作业正在处理时,我从“skipwhile”链返回“true”,原始的observate将等待1秒,然后再次执行http请求。
此过程重复,直到我从“Skipwhile”链返回“false”。
有几件事我不明白:
我在“skipwhile”的文档中看到,在我从它的“call”方法返回“false”之前,它不会从原始可观测发出任何东西(onerror、onnext、oncomplete)。所以,如果它没有发射出任何东西,为什么“repeatWhen”观测到它在做它的工作?它等待一秒钟,然后再次运行请求。是谁发射的?
第二个问题是:为什么从“repeatWhen”中观察到的并不是永远运行的,我的意思是为什么当我从“skipwhile”中返回“false”时它会停止重复?如果返回“false”,则在订阅服务器中成功获取onnext。
在“repeatwhile”的文档中,它说最终我在我的订阅者中得到了一个“oncomplete”的调用,但是“oncomplete”从来没有被调用过。
如果我改变了链接“skipwhile”和“repeatwhen”的顺序,就没什么区别了。为什么?
我知道rxjava是开源的,我可以直接阅读代码,但是正如我所说的-这真的很难理解。
谢谢。
最佳答案
我以前从未与repeatWhen
合作过,但这个问题让我好奇,所以我做了一些研究。skipWhile
确实会发出onError
和onCompleted
,即使在此之前它从未返回true
。因此,每次repeatWhen
发射checkJob()
时都会调用onCompleted
。这回答了问题1。
其余的问题都是基于错误的假设。您的订阅将永远运行,因为您的repeatWhen
永不终止。这是因为repeatWhen
是一个比你意识到的更复杂的野兽。当它从源头得到Observable
时,其中的null
就会发出onCompleted
。如果您接受它并返回onCompleted
则它结束,否则如果您发出任何消息,它将重试。因为delay
只是接收一个发射并延迟它,所以它总是再次发射null
。因此,它不断地重新订阅。
那么,对2的答案是它永远在运行;您可能正在执行此代码之外的其他操作来取消订阅。对于3,您永远不会得到onCompleted
,因为它永远不会完成。对于4,顺序无关紧要,因为你在无限期地重复。
现在的问题是,你如何得到正确的行为?它非常简单,只需使用takeUntil
而不是skipWhile
。这样,您就可以不断重复,直到得到所需的结果,从而在希望流结束时终止流。
下面是一个代码示例:
Observable<Boolean> source = ...; // Something that eventually emits true
source
.repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
.takeUntil(result -> result)
.filter(result -> result)
.subscribe(
res -> System.out.println("onNext(" + res + ")"),
err -> System.out.println("onError()"),
() -> System.out.println("onCompleted()")
);
在本例中,
source
发出布尔值。我每1秒重复一次,直到光源发出true
。我一直服用直到result
是true
为止。我过滤掉所有false
的通知,所以订户在true
之前不会收到它们。