rxjs的引入
// 如果以这种方式导入rxjs,那么整个库都会导入,我们一般不可能在项目中运用到rxjs的所有功能
const Rx = require('rxjs');
解决这个问题,可以使用深链deep link
的方式,只导入用的上的功能
import {Observable} from 'rxjs/Observable';
这样可以减少不必要的依赖,不光可以优化打包文件的大小,还有利于代码的稳定性
另外目前最新的一种解决方案就是Tree Shaking
, Tree Shaking
只对import语句导入产生作用,对require不起作用。因为tree shaking的工作方式是对代码静态分析,import只能出现在代码的第一层,不能出现在if分支中。而require可以出现在if分支中,参数也是可以动态产生的字符串,所以只能动态执行时才知道require函数式如何执行的,这里Tree Shaking就不起作用了。
rxjs中Tree Shaking不起作用
实际项目中,如果不会使用很多RxJS的功能,建议还是避免导入全部RxJS的做法,使用npm导入然后通过打包工具来组合
Observer的完结
为了让Observable有机会告诉Observer已经没有更多数据了,需要有另外一种通信机制。在Rxjs中,实现这种通信机制的就是Observer的complete函数
如果你没法预测你的程序会不会出现异常,那么就需要使用error参数,如果不需要可以直接给个Null作为第二个参数
const theObserver = {
next: item => console.log(item),
null,
complete: () => console.log('No More Data')
};
何时完结这个Observable对象需要Observable主动调用complete()
在Observable发生error之后,不再会调用后面的complete().因为在一个Observable对象中,要么是完结状态,要么是出错状态。一旦进入出错那么就终结了。
Observable
可观察的对象Observer
观察者
联系两者的桥梁就是subscribe
在Rxjs中,发布者就是Observable,观察者就是subscribe函数
,这样就可以吧观察者和发布者联系起来
如何取消订阅
const Observable = Rx.Observable;
const onSubscribe = observer => {
let number = 1;
const handle = setInterval(() => {
observer.next(number++);
}, 1000);
return {
unsubscribe: () => {
clearInterval(handle);
}
};
};
const source$ = new Observable(onSubscribe);
const subscription = source$.subscribe(item => console.log(item));
setTimeout(() => {
subscription.unsubscribe();
}, 3500);
Observable产生的事件,只有Observer通过subscribe订阅之后才会收到,在unsubscribe之后就不再收到
Hot Observable和Cold Observable
如果一个Observable对象同时有多个Observer订阅,如果A在B之前订阅,那么B该不该订阅到错过
的那些数据流。
如果错过就错过了那么这样的Observable成为Hot,但是如果B仍然从头开始订阅这个Observable那么这样的成为Cold
如果每次订阅的时候, 已经有⼀个热的“⽣产者”准备好了, 那就是Hot Observable, 相反,如果每次订阅都要产⽣⼀个新的⽣产者, 新的⽣产者就像汽车引擎⼀样刚启动时肯定是冷的, 所以叫Cold Observable
复杂的问题可以被分解为三个小问题
- 如何产生事件
- 如何响应事件
- 什么样的发布者关联什么样的观察者,也就是何时调用subscribe
Observable产生的事件,只有Observer通过subscribe订阅之后才会收到,在unsubscribe之后就不会收到Observable.create()
用来创建一个Observable对象
在RXJS中,和数组的map一样,作为操作符的map也接受一个函数参数,不同之处是,对于数组的map,是把每个数组元素都映射为一个新的值,组合成一个新的数组
操作符分类
- 创建类(creation)
- 转化类(transformation)
- 过滤类(filtering)
- 合并类(combination)
- 多播类(multicasting)
- 错误处理类(error Handling)
- 辅助⼯具类(utility)
- 条件分⽀类(conditional&boolean)
- 数学和合计类(mathmatical&aggregate)
静态操作符的导入路径rxjs/add/observable/
实例操作符的导入路径rxjs/add/operator/
在链式调用中,静态操作符只能出现在首位,实例操作符则可以出现在任何位置。
Tree Shaking
帮不上Rxjs什么忙,因为Tree Shaking只能做静态代码检查,并不是程序运行时去检测一个函数是否真的被调用、只有一个函数在任何代码中都没有引用过,才会认为这个函数不会被引用。但是RxJS任何一个操作符都是挂在Observable类或者Observable.prototype
上的, 赋值给Observable或者Observable.prototype上某个属性在Tree Shaking看来就是被引⽤, 所以, 所
有的操作符, 不管真实运⾏时是否被调⽤, 都会被Tree Shaking认为是会⽤到的代码, 也就不会当做死代码删除。
退订资源的基本原则:当不再需要某个Observable对象获取数据的时候,就要退订这个Observable对象
在对上游的数据处理中,利用try...catch...的组合捕获project调用的可能的错误,如果真的有错误,那就调用下游的error函数
const sub = this.subscribe({
next: value => {
try{
observer.next(project(value))
}catch(err) {
observer.error(err);
}
},
error: err => observer.error(err),
complete: () => observer.complete()
});
关联Observable
- 给Observable打补丁
这种方式比较简单,可以直接绑定在prototype上,如果是静态属性直接绑定在类上面
- 使用bind绑定特定的Observable对象
// 比如我们自己创建了一个map方法
function map(project) {
return new Observable(observer => {
const sub = this.subscribe({
next: value => observer.next(project(value)),
error: err => observer.next(error),
complete: () => observer.complete()
});
return {
unsubscribe: () => {
sub.unsubscribe();
}
};
});
}
// 这个时候我们就可以主动使用bind改变this的指向
const result$ = map.bind(source$)(x => x * 2);
// 或者直接使用call
const result$ = map.call(source$, x => x * 2);
- 使用lift
lift是Observable的实例函数,它会返回一个新的Observable对象,通过传递给lift的函数参数可以赋予这个新的Observable对象特殊的功能
function map(project) {
return this.lift(function(source$) {
return source$.subscribe({
next: value => {
try{
this.next(project(value));
}catch(err) {
this.error(err);
}
},
error: err => this.error(error),
complete: () => this.complete()
});
});
}
Observable.prototype.map = map;
create
Observable.create() 其实就是简单的调用了Observable的构造函数
Observable.create = function(subscribe) {
return new Observable(subscribe);
}
of
range
range(1, 10) 从1开始吐出10个数据
range(1.5, 3) 从1.5开始吐出3个数据,每次加1
generate
generate类似一个for循环,设定一个初始值,每次递增这个值,知道满足某个条件为止
使用generate实现range功能
const range = (start, count) => {
const max = start + count;
return Observable.generate(
start,
value => value < max,
value => value + 1,
value => value
);
};
所有能够使用for循环完成的操作,都可以使用generate来实现
repeat 重复数据的数据流
const source$ = Observable.of(1,2,3);
const repeated$ = source$.repeat(10);
// 将source$中的数据流重复10遍
empty()
产生一个直接完结的Observable对象
throw()
产生的Observable对象什么都不做,直接抛出错误
never()
产生的Observable对象什么也不做,既不吐出数据,也不产生错误
interval()
接受一个数值类型的参数,代表产生数据的间隔毫秒数
timer()
第一个参数可以是一个数值,表示多少毫秒之后吐出第一个数值0
如果存在第二个参数,那就会产生一个持续吐出数据的Observable对象,第二个参数就是时间间隔
// 2s后。每隔1s产生一个数值,该数值从0开始递增
const source$ = Observable.timer(2000, 1000);
from()
可以将一切转化为Observable
fromPromise()
可以将Promise对象转化为Observable对象,Promise如果成功则调用正常的成功回调,如果失败则调用失败的回调
fromEvent()
将DOM事件转化为Observable对象中的数据
// 将点击事件转化为Observable
const source$ = Observble.fromEvent(document.querySelector('#id'), 'click');
ajax()
用来将ajax的返回转化为Observable对象
repeatWhen()
接受一个函数作为参数,这个函数在上游第一次产生异常是被调用,这个函数应该返回一个Observable对象
const notifier = () => {
return Observable.interval(1000);
};
const source$ = Observable.of(1,2,3);
const repeat$ = source$.repeatWhen(notifier);
defer()
当defer产生的Observable对象呗订阅的时候,defer的函数参数就会被调用,逾期这个函数返回另外一个Observable
const observableFactory = () => Observable.of(1,2,3);
const source$ = Observable.defer(observableFacatory);
合并类操作符
不少合并类操作符都有两种形式,既提供静态操作符,又提供实例操作符。
concat
concat可以将多个Observable的数据内容一次合并
const source1$ = Observable.of(1,2,3);
const source2$ = Observable.of(4,5,6);
const concated$ = source1$.concat(source2$);
// 或者静态操作符
const concated$ = Observable.concat(source1$, source2$);
concat开始从下一个Observable抽取数据是发生在前一个Observable对象完结之后,所以参与到这个concat之中的Observable对象应该都能完结。如果一个Observable对象不完结,那排在后面的Observable对象永远没有上场的机会
// source1$不完结,永远轮不到source2$上场
const source1$ = Observable.interval(1000);
const source2$ = Observable.of(1);
const concated$ = source1$.concat(source2$);
merge
先到先得快速通过
merge同样支持静态和实例形式的操作符
const Observable = Rx.Observable;
const source1$ = Observable.timer(0, 1000).map(x => x + 'A');
const source2$ = Observable.timer(500, 1000).map(x => x + 'B');
const merged$ = Observable.merge(source1$, source2$);
merged$.subscribe(console.log, null, () => console.log('complete'));
merge第一时间会subscribe上游所有的Observable,然后才去先到先得的策略,任何一个Observable只要有数据下来,就会传给下游的Observable对象
merge的第一个Observable如果产生的是同步数据流,那会等第一个同步数据流产生完毕之后,再回合并下一个Observable对象,因此merge的主要适用场景仍然是异步数据流。一个比较常用的场景就是用于合并DOM事件
merge还有一个可选的参数concurrent
,用于指定同时合并的Observable对象的个数
const source1$ = Observable.timer(0, 1000).map(x => x+'A');
const source2$ = Observable.timer(500, 1000).map(x => x+'B');
const source3$ = Observable.timer(1000, 1000).map(x => x+'C');
const merged$ = source1$.merge(source2$, source3$, 2);
merged$.subscribe(console.log, null, () => console.log('complete'));
// 0A 0B 1A 1B 2A 2B...
这里就限定了优先合并2个Observable对象。而第一二个又不会完结,所以source3$没有出头之日。
zip
zip将上游的两个Obserable合并,并且将他们中的数据一一对应。
// 基本用法
const source1$ = Observable.of(1,2,3);
const source2$ = Observable.of(4,5,6);
const zipped$ = Observable.zip(source1$, source2$);
zipped$.subscribe(console.log, null, () => console.log('completed'));
// [1,4] [2,5] [3,6] completed
当使用zip的时候,它会立刻订阅上游Observable,然后开始合并数据。对于zip而言上游任何一个Observable完结,zip只要给这个完结的Observable对象吐出所有的数据找到配对的数据,那么zip就会给下游一个complete信号
const source1$ = Observable.interval(1000);
const source2$ = Observable.of('a', 'b', 'c');
// [0, 'a'] [1, 'b'] [2, 'c'] complete
但是这里也会有一个问题,如果某个上游的source1$吐出的数据很快,但是source$2吐出的数据慢,那么zip就不得不先存储source1$的数据
如果使用zip组合超过两个Observable对象,游戏规则依然一样,组合而成的Observable吐出的数据依然是数组
combineLatest
合并最后一个数据,从所有输入Observable对象中那最后一个产生的数据(最新数据),然后把这些数据组合起来传给下游。
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const result$ = source1$.combineLatest(source2$);
我们也可以自由的定制下游数据
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const project = (a, b) => `${a} and ${b}`;
const result$ = source1$.combineLatest(source2$, project);
多重依赖的问题:
const original$ = Observable.timer(0, 1000);
const source1$ = original$.map(x => x + 'a');
const source2$ = original$.map(x => x + 'b');
const result$ = source1$.combineLatest(source2$);
withLatestFrom
功能类似于combineLatest,但是给下游推送数据只能由一个
const source1$ = Observable.timer(0, 2000).map(x => 1000 * x);
const source2$ = Observable.timer(500, 1000);
const result$ = source1$.withLatestFrom(source2$, (a,b) => a + b);
// 101 203 305 407...
race
第一个吐出数据的Observable对象就是胜者,race产生的Observable就会完全采用Observable对象的数据,其余的输入Observable对象则会被退订而抛弃。
const source1$ = Observable.timer(0, 2000).map(x => x + 'a');
const source2$ = Observable.timer(500, 2000).map(y => y + 'b');
const winner$ = source1$.race(source2$);
winner$.subscribe(console.log);
// 1a 2a 3a...
startWith
让一个Observable对象在被订阅的时候,总是先吐出指定的若干数据
const origin$ = Observable.timer(0, 1000);
const result$ = origin$.startWith('start');
// start
// 0
// 1
startWith的操作符就是为了满足链式调用的需求
original$.map(x => x * 2).startWith('start').map(x => x + 'ok');
forkJoin
只有当所有的Observable对象都完结,确定不会有新的数据产生的时候,forkJoin就会把所有输入的Observable对象产生的最后一个数据合并成给下游唯一的数据
const source1$ = Observable.interval(1000).map(x => x + 'a').take(1);
const source2$ = Observable.interval(1000).map(x => x + 'b').take(3);
const concated$ = Observable.forkJoin(source1$, source2$);
concated$.subscribe(console.log);
// ["0a", "2b"]
高阶Observable
所谓高阶Observable,指的就是产生数据依然是Observable的Observable
// 高阶Observable示例
const ho$ = Observable.interval(1000).take(2)
.map(x => Observable.interval(1500).map(y => x + ':' + y));
concatAll
会对其内部的Observable对象做concat操作
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
const concated$ = ho$.concatAll();
// 0:0 0:1 1:0 1:1
concatAll首先会订阅上游产生的第一个内部的Observable对象,抽取其中的数据,然后只有当第一个Observable完结的时候才回去订阅第二个Observable。这样很容易产生数据积压
mergeAll
和concatAll()功能类似,但是只要上游产生了数据,mergeAll就会立即订阅
switch
switch的含义就是切换,总是切换到最新的内部Observable对象获取数据。每当switch的上游高阶Observable产生一个内部Observable对象,witch都会⽴刻订阅最新的内部Observable对象上, 如果已经订阅了之前的内部Observable对象, 就会退订那个过时的内部Observable对象, 这个“⽤上新的, 舍弃旧的”动作, 就是切换。
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
const result$ = ho$.switch();
exhaust
exhaust在耗尽当前内部Observable数据之前不会切换到下一个内部Observable对象。和switch一样,exhaust产生的Observable对象完结前提是最新的内部Observable对象完结而且上游高阶Observable对象完结
count
统计上游Observable对象吐出所有数据的个数
const source$ = Observable.of(1,2,3).concat(Observable.of(4,5,6));
const count$ = source$.count(); // 6
max, min
取的最小值和最大值
reduce
规约统计
const source$ = Observable.range(1, 100);
const reduced$ = source$.reduce((acc, current) => acc + current, 0);
// 参数基本和js中的一致
find和findIndex
在某些情况下,我们希望可以将find和findIndex结合在一起,我们可以这样做
const source$ = Observable.of(3,1,4,1,5,9);
const isEven = x => x % 2 === 0;
const find$ = source$.find(isEven);
const findIndex$ = source$.findIndex(isEven);
const zipped$ =find$.zip(findIndex$);
defaultIfEmpty
defaultIfEmpty()除了检测上游Observable对象是否为空之外,还要接受一个默认值作为参数,如果上游Observable对象是空的,那就把默认值吐出来
const new$ = source$.defaultIfEmpty('this is default');
filter
过滤
first
如果first不接受参数,那么就是获取的上游的第一个数据
如果first接受函数作为参数,那么就会获取上游数据中满足函数条件的第一个数据
last
工作方式与first刚好相反,从上游数据的末尾开始寻找符合条件的元素
takeWhile
接受一个判定函数作为参数
const source$ = Observable.range(1, 100);
const takeWhile$ = source$.takeWhile(
value => value % 2 === 0
);
takeUtil
takeUtil是一个里程碑式的过滤类操作符,因为takeUtil让我们可以用Observable对象来控制另一个Observable对象的数据产生
在RxJS中,创建类操作符是数据流的源头,其余所有操作符最重要的三类就是合并类、过滤类和转化类。
map
map用来改变数据流中的数据,具有一一对应的映射功能
const source$ = Rx.Observable.of(1,2,3);
// 注意这里只能使用普通函数,箭头函数中的this是绑定在执行环境上的,无法获取context中的值
const mapFunc = function(value, index) {
return `${value} ${this.separator} ${index}`;
}
const context = {separator: ':'};
const result$ = source$.map(mapFunc, context);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
mapTo
无论上游产生什么数据,传给下游的都是同样的数据
// 将result$中的数据都映射成A
const result$ = source$.mapTo('A');
pluck
pluck就是把上游数据中特定字段的值拔
出来
const source$ = Rx.Observable.of(
{name: 'RxJS', version: 'v4'},
{name: 'React', version: 'v15'},
{name: 'React', version: 'v16'},
{name: 'RxJS', version: 'v5'}
);
const result$ = source$.pluck('name');
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
// RxJS
// React
// React
// RxJS
// complete
上面的代码中,pluck方法将对象中的键对应的值获取出来
获取DOM事件中的值
const click$ = Rx.Observable.fromEvent(document, 'click');
const result$ = click$.pluck('target', 'tagName');
// HTML
将上游数据放在数组中传给下游操作符
bufferTime
用一个参数来指定产生缓冲窗口的间隔
const source$ = Rx.Observable.timer(0, 100);
// 参数400,就会把时间划分为连续的400毫秒长度区块,上游传来的数据不会直接传给下游,而是在该时间区块的开始就新建一个数组对象推送给下游
const result$ = source$.bufferTime(400);
如果上游在短时间内产生了大量的数据,那bufferTime就会有很大的内存压力,为了防止出现这种情况,bufferTime还支持第三个可选参数,用于指定每个时间区间内缓存的最多数据个数
const result$ = source$.bufferTime(400, 200, 2);
bufferCount
根据个数来界定
bufferWhen
接受一个函数作为参数,这个参数名为closingSelector
bufferToggle
buffer
将上游数据放在Observable中传给下游的操作符
windowTime
用一个参数来指定产生缓冲窗口的间隔
windowCount
windowToggle
window
高阶map
所有的高阶map的操作符都有一个函数参数project,但是和普通map不同,普通map只是把一个数据映射成另外一个数据,高阶map的函数参数project把一个数据映射成一个Observable对象
const project = (value, index) => {
return Observable.interval(100).take(5);
}