我的要求:
ResponseBody
对象。 是)我有的:
Observable<ResponseBody> api1Call = api1.fetchData();
Observable<ResponseBody> api2Call = api2.fetchData();
Observable<ResponseBody> api3Call = api3.fetchData();
Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
@Override
public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
return null;
}
}).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
Logger.e(throwable, "some error with one of the apis?");
return Observable.empty();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Logger.i("onCompleted");
}
@Override
public void onError(Throwable e) {
Logger.e(e, "onError");
}
@Override
public void onNext(Object o) {
Logger.i("onNext " + o);
}
});
我得到的输出:
some error with one of the apis?
// stacktrace of the error
onCompleted
我是RxJava的新手,非常困惑。我在StackOverflow上找到了一些答案,说
zip
做类似的事情,但是离我的要求还差得远。我猜“组合”运算符之一+适当的异常处理将满足我的需求。到目前为止,真的很难弄清楚我使用的版本:
compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
最佳答案
combineLast
或zip
实现并行,在我的测试中rxjava
将按顺序执行并发射您的项目。 Func2#call
,而是提交onError
。您甚至无法以这种方式获得其他成功任务的结果。 flatMap
,这是在rxjava
中实现并发的传统方法。它还满足您的其他要求。 这是一个小而完整的示例。
我使用一个简单的网站服务进行测试。
我使用
Semaphore
等待所有任务完成,您可以完全忽略它。我将日志添加到http请求中以更好地理解,您也可以完全忽略它。public interface WebsiteService {
@GET
Observable<ResponseBody> website(@Url String url);
}
然后,我使用以下代码通过
rxjava
测试结果。 HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
Retrofit retrofit = new Retrofit.Builder().baseUrl("https://www.google.com")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(new OkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
.build();
WebsiteService websiteService = retrofit.create(WebsiteService.class);
final Semaphore s = new Semaphore(1);
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable<ResponseBody> first = websiteService.website("http://github.com");
Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
Observable<ResponseBody> third = websiteService.website("http://notexisting.com");
final int numberOfCalls = 3; // testing for three calls
Observable.just(first, second, third)
.flatMap(new Function<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
@Override
public ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable) throws Exception {
return responseBodyObservable.subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<ResponseBody>() {
private int currentDoneCalls = 0;
private void checkShouldReleaseSemaphore() {
if (currentDoneCalls >= numberOfCalls) {
s.release();
}
}
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull ResponseBody responseBody) {
System.out.println("Retrofit call success " + responseBody.contentType());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("Retrofit call failed " + e.getMessage());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}
@Override
public void onComplete() {
System.out.println("onComplete, All request success");
checkShouldReleaseSemaphore();
}
});
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("All request done");
s.release();
}
我使用
rxjava2
和翻新的adapter-rxjava2
进行测试。compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.1'
更新
github的RxJava2简介页指出了实现并行处理的实用方法。
尽管此示例基于
RxJava2
,但操作flatMap
是已经存在于
RxJava
中。关于android - 在错误情况下如何组合多个RxJava链非阻塞,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44643106/