下面的我的订户都没有调用onNext()或onCompleted()。我尝试通过doOnNext()/ doOnTerminate()实现订户。我也尝试过doAfterTerminate()。我也尝试过显式定义一个订户,而onNext()或onCompleted()都没有被调用。

根据RxJS reduce doesn't continue,不是reduce()没有终止,所以我尝试添加take(1),但是没有用。在同一个stackoverflow问题中,有人说问题可能是我的流永远不会关闭。除了take(1)之外,也许还有其他方法可以关闭流,但是我对ReactiveX的了解还不够。

根据Why is OnComplete not called in this code? (RxAndroid),这可能是该系列中的原始流没有终止。但是我不知道如果我调用我认为应该发出终止信号的take(1)为何会很重要。

基本上,为什么不执行以下行?

System.out.println("doOnNext map.size()=" + map.size());


即使以下代码行已执行98次:

map.put(e.getKey(), e.getValue());


JsonObjectObservableRequest.java

import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {

    public JsonObjectObservableRequest(int method, String url, JSONObject request) {
    jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(),     getResponseErrorListener());
    }

    private Response.Listener<JSONObject> getResponseListener() {
        return new Response.Listener<JSONObject>() {
            @Override
            public void onResponse(JSONObject response) {
                publishSubject.onNext(Observable.just(response));
            }
        };
    }

    private Response.ErrorListener getResponseErrorListener() {
        return new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                Observable<JSONObject> myError = Observable.error(error);
                publishSubject.onNext(myError);
            }
        };
    }

    public JsonObjectRequest getJsonObjectRequest() {
        return jsonObjectRequest;
    }

    public Observable<JSONObject> getObservable() {
    return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable<    JSONObject>>() {
            @Override
            public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
                return jsonObjectObservable;
            }
        });
    }
}


JsonObjectObservableRequest调用代码

import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            .take(1)
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();

    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());

最佳答案

您已正确确定了问题,没有在源网络onCompleted()上(在Observable类中)调用JsonObjectObservableRequest,而take(1)也不起作用,因为您将其放在reduce之前。
如您所知,reduce必须在Observable且排放量有限的情况下运行,因为当所有项目都已排放完时,reduce会排放累积的项目,因此,它依靠onCompleted()事件知道观察到的流何时结束。


我认为,就您而言,最好的办法是更改JsonObjectObservableRequest的操作方式,您不需要Subject,可以使用创建方法来包装回调(您可以看到我的答案here ,并了解有关在回调世界与RxJava here之间进行桥接的更多信息。
而且,您不需要先发出Observable的东西,然后再flatmap发出项目,只需使用onNext()发出项目,并使用onError()发出错误,实际上您就隐藏了错误并将其转换为发射,这可能会使其更难以处理下游的错误。
调用onCompleted()表示完成后,应在onResponse()处调用onNext()。在出现错误的情况下,信号通知onError将通知流终止。
另一个问题是取消似乎没有以这种方式处理的请求,您可以向上阅读源以查看如何在包装callbacl调用时完成该请求。

10-07 19:15
查看更多