我正在尝试对rxjava使用改造。我彼此之间或与我创建的可观测对象链接改造可观测对象时遇到问题。一些例子:

    Observable<List<Friend>> friendsListObservable = friendsService.getFriends();
    Observable<Void> updateReqestObservable = friendsListObservable.switchMap(friends -> {
        Log.d(TAG, "Hello");
        return userAPI.updateFriends(session.getUserId(), friends);
    }).subscribe();


一切都将被调用,直到切换到map。因此,您好永远不会显示,但是如果我返回实例Observable.just(null)而不是可观察到的改造,它就可以正常工作。同样,如果我使用无需链接即可观察到的改造,它也可以工作。

编辑1:
这是一个Android应用。似乎根本没有调用map运算符。有时也会发生可观察到的改造。我仍然认为它与线程有关。据我了解,发出项目时会调用一个运算符,但是调用onNext不会触发map运算符。下面是我的整个代码:

public Observable<List<FacebookFriend>> getFriends() {
    PublishSubject<List<FacebookFriend>> friendsPublishSubject = PublishSubject.create();

    Observable<List<FacebookFriend>> returnObservable = friendsPublishSubject.doOnSubscribe(() -> {
    Log.d(TAG, "OnSubscribe called");
    Session session = Session.getActiveSession();

    if (session != null && session.isOpened()) {
        new Request(session, "/me/friends", null, HttpMethod.GET,
                new Request.Callback() {
                    public void onCompleted(Response response) {
                        JSONObject graphResponse = response.getGraphObject()
                                .getInnerJSONObject();
                        try {
                            JSONArray friends = graphResponse.getJSONArray("data");
                            Gson gson = new Gson();
                            Type listType = new TypeToken<ArrayList<FacebookFriend>>() {
                            }.getType();
                            List<FacebookFriend> friendsList = gson.fromJson(friends.toString(), listType);

                            friendsPublishSubject.onNext(friendsList);
                            friendsPublishSubject.onCompleted();
                        } catch (JSONException e) {
                            e.printStackTrace();
                            friendsPublishSubject.onError(e);
                        }
                    }
                }).executeAsync();
    } else {
        InvalidSessionException exception = new InvalidSessionException("Your facebook session expired");
        friendsPublishSubject.onError(exception);
    }
    });

    return returnObservable.subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}

public Observable<Void> updateFriendsList() {
    Observable<List<FacebookFriend>> facebookFriendsListObservable = facebookService.getFriends();

    Observable<Void> updateReqestObservable = facebookFriendsListObservable.map(friends -> {
    Log.d(TAG, "This is never called");
});


}

最佳答案

可以在实际调用点处进行阻止的一种方法是,将其订阅一个主题,然后在代码的任何部分要求已执行请求的末尾,对该主题进行阻止。

例如:

final ReplaySubject<Void> subject = ReplaySubject.create();
friendsService.getFriends()
    .switchMap(friends -> userApi.updateFriends(session.getUserId(), friends))
    .subscribeOn(Schedulers.io())
    .subscribe(subject);

// Do other things that don't require this to be done yet...

// Wherever you need to wait for the request to happen:
subject.toBlocking().singleOrDefault(null);


由于主题也是Observable,因此您甚至可以从方法中返回主题,并稍后对其进行阻止。

综上所述,将update调用用作此处的副作用似乎有些奇怪。 updateFriends还是翻新服务?如果是这样,您可能要考虑使更新调用成为一个同步服务,该服务返回void而不是将在onNext调用中调用的Observable<Void>。如果仍然需要阻止,则可以这样使用forEach

friendsService.getFriends()
    .forEach(friends -> { userApi.updateFriends(session.getUserId(), friends) });


forEachsubscribe非常相似,除了它显式阻止。您甚至可以将其与原始代码一起使用,但是添加一个空的onNext动作并非十分干净。

如果您还可以提供有关您的应用程序结构的更多详细信息(这是否全部在主要方法中?是否是Android应用程序?等),我可以提供更多的指示,以尽可能避免阻塞。

关于android - 改造链观察,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29319903/

10-11 10:56