RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的observer。今天我们就来学习一下rxJava,并分析一下它源码感受一下它的观察者模式。
RxJava的简单使用
一、mavan的pom.xml中增加rxjava的依赖
这里我们用的是rxjava1.3.0,目前最新的已经更新到2了。
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.3.0</version>
</dependency>
二、测试的用类
import rx.Observable;
import rx.Subscriber;
/**
* @author huhx
*/
public class RxJavaTest {
public static void main(String[] args) {
Observable<String> observable = Observable.just("One", "Two", "Three");
observable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
} @Override
public void onError(Throwable item) {
System.out.println("onError");
} @Override
public void onNext(String item) {
System.out.println("Item is " + item);
}
});
}
}
三、打印的结果如下:
Item is One
Item is Two
Item is Three
onCompleted
RxJava源码的简单分析
根据上述的代码,我们简单分析一下程序的流程。
一、首先Observable.just("One", "Two", "Three")代码:
just是一个工厂方法,用心构建一个Observable对象。
public static <T> Observable<T> just(T t1, T t2, T t3) {
return from((T[])new Object[] { t1, t2, t3 });
}
from方法的代码如下:
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return unsafeCreate(new OnSubscribeFromArray<T>(array));
}
二、我们重点是看unsafeCreate方法:
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
对于RxJavaHooks.onCreate()方法,第一次会先执行RxJavaHooks静态块的代码。
static {
init(); // 初始化了很多RxJavaHooks跟事件有着的变量
}
onCreate方法中,这个没有怎么看懂后续再会。
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
三、对于observable.subscribe()代码
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
Observable的subscribe方法代码如下:
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
} subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
} try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RxJavaHooks.onObservableError(r);
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}
四、整个的正常流程会走到16行的代码,这个是我们重点的分析地方。如果是异常情况,则只会执行onError方法
对于这个例子中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)得到的结果OnSubscribeFromArray这个类。调用它的call方法。
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
setProducer方法执行的代码如下:
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass through unless a request has been made
if (toRequest == NOT_SET) {
// we pass through to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
这里会走到21行的代码,跟进去FromArrayProducer的request方法:
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}
对于fastPath()的代码如下:
void fastPath() {
final Subscriber<? super T> child = this.child; for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
执行onNext()方法,其中t就是Observable.just()方法的参数。child就是observable.subscribe的定义的参数。