我正在尝试通过实现RxJava进行异步rest调用。
下面是实现-
final Observable<List<A>> observableA = Observable.create(new Observable.OnSubscribe<List<A>>() {
@Override
public void call(Subscriber<? super List<A>> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(//another Function call);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
final Observable<List<B>> observableB = Observable.create(new Observable.OnSubscribe<List<B>>() {
@Override
public void call(Subscriber<? super List<B>> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(//another Function call);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
Observable<List<C>> reservationObserv = Observable.zip(observableA, observableB, new Func2< List<A>, List<B> , List<C>>() {
@Override
public List<C> call(final List<A> a, final List<B> b) {
// Merge the response
return c;
}
});
到目前为止,先执行ObservableA,然后再执行ObservableB。
任何人都可以提出为什么呼叫不是异步的建议。
提前致谢。
当我以以下方式执行时,先执行observableB然后执行ObservableA
final Observable<List<A>> observableA = Observable.create(new Observable.OnSubscribe<List<A>>() {
@Override
public void call(final Subscriber<? super List<A>> subscriber) {
Runnable run = new Runnable() {
@Override
public void run() {
// Delay of 1000ms
subscriber.onNext(//calling a method);
subscriber.onCompleted();
}
};
executorService.execute(run);
}
});
final Observable<List<B>> observableB = Observable.create(new Observable.OnSubscribe<List<B>>() {
@Override
public void call(final Subscriber<? super List<B>> subscriber) {
Runnable run = new Runnable() {
@Override
public void run() {
// No delay
subscriber.onNext(//calling a method);
subscriber.onCompleted();
}
};
executorService.execute(run);
}
});
Observable<List<C>> observableC = Observable.zip(observableA, observableB, new Func2< List<A>, List<B> , List<C>>() {
@Override
public List<C> call(final List<A> a, final List<B> b) {
// Merge the response
return c;
}
});
最佳答案
默认情况下,RxJava是同步的。因此,在您的第一种情况下,zip运算符将预订observableA
,然后在observableA
完成时将预订observableB
。
在第二种情况下,当您使用执行程序服务时,您实际上是异步的。
为了与您的第一个版本保持同步,因为它在注释中很含糊,您应该查看Schedulers
并告诉RxJava您的订阅应在哪个schdeulers中执行。
Observable<List<C>> observableC = Observable.zip(
observableA.subscribeOn(Schedulers.io()),
observableB.subscribeOn(Schedulers.io()),
(a, b) -> /** ... **/);
observableC.subscribe();
您可以使用不同的调度程序,具体取决于要达到的目标。 (使用Schedulers.io()进行I / O,...)