本文为原创文章,转载请标明出处

目录

  1. Subject
  2. BehaviorSubject
  3. ReplaySubject
  4. AsyncSubject

1. Subject

总的来说,Subject 既是能够将值多播给多个观察者的特殊的可观察对象,因为可以添加观察者并使用 subscribe 方法来接收值;又是观察者,因为它有 next(v)error(e)complete() 方法。下面这段代码很好的说明了每个 Subject 既是 Observable 又是 Observer

var subject = new Rx.Subject();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
}); subject.next(1);
subject.next(2);

输出:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

2. BehaviorSubject

BehaviorSubject 能够保存当前值,当有新的观察者订阅时,就会立即从BehaviorSubject 接收到当前值。下面这段代码,初始值为0,尽管第二个观察者是在 2 发送出去之后订阅的,但是 BehaviorSubject 保存了当前值,在第二个观察者订阅时立即从BehaviorSubject 接收到了当前值 2

var subject = new Rx.BehaviorSubject(0);

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
}); subject.next(1);
subject.next(2); subject.subscribe({
next: (v) => console.log('observerB: ' + v)
}); subject.next(3);

输出:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

3. ReplaySubject

ReplaySubjectBehaviorSubject 相似,ReplaySubject 能够保存指定个数的数据,当有新的观察者订阅时,就会从 ReplaySubject 接收到指定个数的这些值并回放出来。下面这段代码,指定能够保存 3 个数据,当第二个观察者订阅时,获取到保存的三个值 234

var subject = new Rx.ReplaySubject(3);

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
}); subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4); subject.subscribe({
next: (v) => console.log('observerB: ' + v)
}); subject.next(5);

输出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

另外,ReplaySubject 还可以指定 windowTime 来保存到目前为止多久之内的数据,下面这段代码,指定能够保存 100 个数据,指定能够保存到目前为止 500 毫秒之内的数据。

var subject = new Rx.ReplaySubject(100, 500);

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
}); var i = 1;
setInterval(() => subject.next(i++), 200); setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);

输出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

4. AsyncSubject

AsyncSubject 只能将执行完成时的最后一个值发送给观察者。下面这段代码,当 complete() 时才会将最后一个值 5 发送给第一个观察者和第二个观察者。

var subject = new Rx.AsyncSubject();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
}); subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4); subject.subscribe({
next: (v) => console.log('observerB: ' + v)
}); subject.next(5);
subject.complete();

输出:

observerA: 5
observerB: 5

如有不当之处,请予指正,谢谢~

05-27 18:59