我的要求:

  • N并行调用改造
  • 等待所有调用完成(成功或失败)
  • 如果k(0 null或错误对象,而成功的调用可以返回ResponseBody对象。
  • 可选:如果RxJava有一种简单的方法来区分每个成功或失败的调用,那么很好,如果不是,我将分析响应并自己解决

  • 是)我有的:
            Observable<ResponseBody> api1Call = api1.fetchData();
            Observable<ResponseBody> api2Call = api2.fetchData();
            Observable<ResponseBody> api3Call = api3.fetchData();
    
            Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
                @Override
                public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
                    Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
                    return null;
                }
            }).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
                @Override
                public Observable<?> call(Throwable throwable) {
                    Logger.e(throwable, "some error with one of the apis?");
                    return Observable.empty();
                }
            }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Object>() {
                        @Override
                        public void onCompleted() {
                            Logger.i("onCompleted");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Logger.e(e, "onError");
                        }
    
                        @Override
                        public void onNext(Object o) {
                            Logger.i("onNext " + o);
                        }
                    });
    

    我得到的输出:
    some error with one of the apis?
    // stacktrace of the error
    onCompleted
    

    我是RxJava的新手,非常困惑。我在StackOverflow上找到了一些答案,说zip做类似的事情,但是离我的要求还差得远。我猜“组合”运算符之一+适当的异常处理将满足我的需求。到目前为止,真的很难弄清楚

    我使用的版本:
    compile 'io.reactivex:rxjava:1.3.0'
    compile 'io.reactivex:rxandroid:1.2.1'
    compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
    

    最佳答案

  • 您无法通过combineLastzip实现并行,在我的测试中rxjava将按顺序执行并发射您的项目。
  • 如果您的任务之一失败,则不会调用您的Func2#call,而是提交onError。您甚至无法以这种方式获得其他成功任务的结果。
  • 解决方案是flatMap,这是在rxjava中实现并发的传统方法。它还满足您的其他要求。

  • 这是一个小而完整的示例。

    我使用一个简单的网站服务进行测试。

    我使用Semaphore等待所有任务完成,您可以完全忽略它。我将日志添加到http请求中以更好地理解,您也可以完全忽略它。
    public interface WebsiteService {
    
        @GET
        Observable<ResponseBody> website(@Url String url);
    
    }
    

    然后,我使用以下代码通过rxjava测试结果。
       HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
        loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
    
        Retrofit retrofit = new Retrofit.Builder().baseUrl("https://www.google.com")
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .client(new OkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
                .build();
        WebsiteService websiteService = retrofit.create(WebsiteService.class);
    
        final Semaphore s = new Semaphore(1);
        try {
            s.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        Observable<ResponseBody> first = websiteService.website("http://github.com");
        Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
        Observable<ResponseBody> third = websiteService.website("http://notexisting.com");
    
        final int numberOfCalls = 3; // testing for three calls
    
        Observable.just(first, second, third)
                .flatMap(new Function<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
                    @Override
                    public ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable) throws Exception {
                        return responseBodyObservable.subscribeOn(Schedulers.computation());
                    }
                })
                .subscribeOn(Schedulers.computation())
                .subscribe(new Observer<ResponseBody>() {
    
                    private int currentDoneCalls = 0;
    
                    private void checkShouldReleaseSemaphore() {
                        if (currentDoneCalls >= numberOfCalls) {
                            s.release();
                        }
                    }
    
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
    
                    }
    
                    @Override
                    public void onNext(@NonNull ResponseBody responseBody) {
                        System.out.println("Retrofit call success " + responseBody.contentType());
                        synchronized (this) {
                            currentDoneCalls++;
                        }
                        checkShouldReleaseSemaphore();
                    }
    
                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("Retrofit call failed " + e.getMessage());
                        synchronized (this) {
                            currentDoneCalls++;
                        }
                        checkShouldReleaseSemaphore();
                    }
    
                    @Override
                    public void onComplete() {
                        System.out.println("onComplete, All request success");
                        checkShouldReleaseSemaphore();
                    }
    
                });
    
        try {
            s.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("All request done");
            s.release();
        }
    

    我使用rxjava2和翻新的adapter-rxjava2进行测试。
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex.rxjava2:rxjava:2.1.0'
    compile 'com.squareup.retrofit2:retrofit:2.3.0'
    compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
    compile 'com.squareup.okhttp3:logging-interceptor:3.8.1'
    

    更新

    github的RxJava2简介页指出了实现并行处理的实用方法。



    尽管此示例基于RxJava2,但操作flatMap
    已经存在于RxJava中。

    关于android - 在错误情况下如何组合多个RxJava链非阻塞,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44643106/

    10-10 19:47