我真的很喜欢rxjava,它是一个很棒的工具,但有时很难理解它是如何工作的。
在我们的android项目中,我们使用了rxjava的reformation,下面是一个用例:
我需要轮询服务器,在服务器执行某些任务时,重试之间会有一些延迟。服务器完成后,我必须交付结果。所以我已经用rxjava成功地做到了,下面是代码片段:
我用“滑雪”和“重复时间”

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
        .skipWhile(new Func1<CheckJobResponse, Boolean>() {

            @Override
            public Boolean call(CheckJobResponse checkJobResponse) {
                boolean shouldSkip = false;

                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());

                switch (checkJobResponse.getJobStatus()){
                    case CheckJobResponse.PROCESSING:
                        shouldSkip = true;
                        break;
                    case CheckJobResponse.DONE:
                    case CheckJobResponse.ERROR:
                        shouldSkip = false;
                        break;
                }
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);

                return shouldSkip;
            }
        })
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Void> observable) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
                return observable.delay(1, TimeUnit.SECONDS);
            }
        }).subscribe(new Subscriber<CheckJobResponse>(){
            @Override
            public void onNext(CheckJobResponse response) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);

            }

            @Override
            public void onError(BaseError error) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
                Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();

            }

            @Override
            public void onCompleted() {
                if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
            }
        });

代码运行良好:
当服务器响应作业正在处理时,我从“skipwhile”链返回“true”,原始的observate将等待1秒,然后再次执行http请求。
此过程重复,直到我从“Skipwhile”链返回“false”。
有几件事我不明白:
我在“skipwhile”的文档中看到,在我从它的“call”方法返回“false”之前,它不会从原始可观测发出任何东西(onerror、onnext、oncomplete)。所以,如果它没有发射出任何东西,为什么“repeatWhen”观测到它在做它的工作?它等待一秒钟,然后再次运行请求。是谁发射的?
第二个问题是:为什么从“repeatWhen”中观察到的并不是永远运行的,我的意思是为什么当我从“skipwhile”中返回“false”时它会停止重复?如果返回“false”,则在订阅服务器中成功获取onnext。
在“repeatwhile”的文档中,它说最终我在我的订阅者中得到了一个“oncomplete”的调用,但是“oncomplete”从来没有被调用过。
如果我改变了链接“skipwhile”和“repeatwhen”的顺序,就没什么区别了。为什么?
我知道rxjava是开源的,我可以直接阅读代码,但是正如我所说的-这真的很难理解。
谢谢。

最佳答案

我以前从未与repeatWhen合作过,但这个问题让我好奇,所以我做了一些研究。
skipWhile确实会发出onErroronCompleted,即使在此之前它从未返回true。因此,每次repeatWhen发射checkJob()时都会调用onCompleted。这回答了问题1。
其余的问题都是基于错误的假设。您的订阅将永远运行,因为您的repeatWhen永不终止。这是因为repeatWhen是一个比你意识到的更复杂的野兽。当它从源头得到Observable时,其中的null就会发出onCompleted。如果您接受它并返回onCompleted则它结束,否则如果您发出任何消息,它将重试。因为delay只是接收一个发射并延迟它,所以它总是再次发射null。因此,它不断地重新订阅。
那么,对2的答案是它永远在运行;您可能正在执行此代码之外的其他操作来取消订阅。对于3,您永远不会得到onCompleted,因为它永远不会完成。对于4,顺序无关紧要,因为你在无限期地重复。
现在的问题是,你如何得到正确的行为?它非常简单,只需使用takeUntil而不是skipWhile。这样,您就可以不断重复,直到得到所需的结果,从而在希望流结束时终止流。
下面是一个代码示例:

Observable<Boolean> source = ...; // Something that eventually emits true

source
    .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
    .takeUntil(result -> result)
    .filter(result -> result)
    .subscribe(
        res -> System.out.println("onNext(" + res + ")"),
        err -> System.out.println("onError()"),
        () -> System.out.println("onCompleted()")
    );

在本例中,source发出布尔值。我每1秒重复一次,直到光源发出true。我一直服用直到resulttrue为止。我过滤掉所有false的通知,所以订户在true之前不会收到它们。

07-27 21:57