我是RxJava的新手,我知道平面地图用于将发射的项目映射到可观察的项目。我也知道,基于documentation,所有发出的可观察值都将合并(展平)为单个可观察流。

我想知道如果这些内在可观测值中的任何一个完成,会发生什么?

例如:我有一个可观察到的发射项目数据键。我必须进行另一个异步http调用才能从服务器获取项目数据,因此我通过使用另一个可观察的对象来调用它。我使用平面图将这两者连接起来,并创建一个主要的可观察对象。

以下“ SomeMethodThatWantsItems”的run()方法何时被调用?

public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
    Consumer<Item> onNextConsumer =
    Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
    searchObservable
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Consumer<Item>(){
                           @Override
                           public void accept(@NonNull Item item) throws Exception {
                               //Do stuff with the item
                           }
                       }
                , new Consumer<Exception>() { //some implementation of onErrorConsumer
                    }
                 //OnComplete
                , new Action(){

                        @Override
                        public void run() throws Exception {
                            //When does this get called??? after the search complete or when the first http call is successful?
                        }
                    });

}

private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {

            //Assume that our search engine call onFind everytime it finds something
            searchEngine.addSearchListener(new searchEngineResultListener(){
                @Override
                public void onFind(String foundItemKey){
                    emitter.onNext(foundItemKey);
                }

                @Override
                public void onFinishedFindingResults(){
                    emitter.onComplete();
                }
            });

        }
    });
}

private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{

    return Observable.create(new ObservableOnSubscribe<Item>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {

            //Call the server to get the item
            httpCaller.call(key, new onCompleteListener(){
                @Override
                public void onCompletedCall(Item result)
                {
                    emitter.onNext(result);
                    //The result is complete! end the stream
                    emitter.onComplete();
                }
            });
        }
    });
}

public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
    //Where everything comes together
    Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
    retuern searchResultObservable
            .observeOn(Schedulers.newThread())
            .flatMap(new Function<String, Observable<Item>>(){
                @Override
                public Observable<Item> apply(String key){
                    return getItemByKey(httpCaller, key);
                }
            });
}

最佳答案

onComplete()总是被调用一次,然后流停止。 (这是Observable Contract的一部分)。
这意味着在您的情况下,检索到所有项目后将调用您在onComplete()处的SomeMethodThatWantsItems
如果是flatMap(),则每个内部Observable的完成都会简单地向源Observable发信号,以停止将内部Observable到源Observable的拼合项目,flatMap()合并内部Observable的项目>只要此流发送项目,那么它基本上会将整个内部Observable流消耗到源流中,整个流直到像onComplete()这样的终止事件3为止,因此,如果内部Observable可以发出多于1个项,这意味着它将在源流上产生多于1个的发射。

关于java - RxJava平面图:当其中一个可观察的完整结果发生时会发生什么?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43433742/

10-09 13:19