import React from 'react';
import { Observable } from 'rxjs'; const FlowPage = () => { const onSubscribe = observer => {
observer.next(1);
observer.next(2);
observer.next(3);
} // 创建一个发布者
// Observable是一个特殊类,它接受一个处理Observer的函数
// 而Observer就是一个普通的对象,
// 对于Observer对象要求:它必须包含一个名为next的属性(是一个函数)
// next用于接收被推过来的数据
const source$ = new Observable(onSubscribe); // 参数就是观察者对象
const theObserver = {
next : item => console.log(item)
} // 一个观察者调用Observable对象的subscribe函数
source$.subscribe(theObserver) return <h1>rxjs学习</h1>;
}; export default FlowPage;

跨越时间的Observable

// 推送数据可以有时间间隔。
const onSubscribe = observer => {
let number = 1;
const Timer = setInterval(()=>{
observer.next(number++)
if(number > 3){
clearInterval(Timer)
}
},1000)
}

永无止境的Observable

假如我们不中断这个程序,让它一直运行下去这个程序也不会消耗更多的内存。

Observable对象每次只吐出一个数据,然后这个数据就被Observer消化处理了,不会存在数据的堆积。

const onSubscribe = observer => {
let number = 1;
const Timer = setInterval(()=>{
observer.next(number++)
},1000)
}

Observable的完结

Observer时刻准备着接收数据,如果没有推送数据了,相关的资源不会被释放,为了让Observer明确知道这个数据流已经不会再有新的数据,

需要调用Observer的complete函数。

import React from 'react';
import { Observable } from 'rxjs'; const FlowPage = () => { // 推送数据可以有时间间隔。
const onSubscribe = observer => {
let number = 1;
const Timer = setInterval(()=>{
observer.next(number++)
if(number > 3){
clearInterval(Timer)
observer.complete();
}
},1000)
}
const source$ = new Observable(onSubscribe); const theObserver = {
next : item => console.log(item),
complete:()=> console.log(' no more data')
} source$.subscribe(theObserver) return <h1>rxjs学习</h1>;
}; export default FlowPage;

observable的错误处理

// 一旦进入出错状态,observable就终结了,就不会再调用后面的next()和complete()
// 调用了complete()函数终结,也不能再调用next()和error()
const onSubscribe = observer => {
observer.next(1);
observer.error('something wrong!')
observer.complete()
}
const source$ = new Observable(onSubscribe); const theObserver = {
next : item => console.log(item),
error: err => console.log(err),
complete:()=> console.log(' no more data')
} source$.subscribe(theObserver)

Observable简洁形式

// 为了让代码更加简洁,没有必要创建一个参数对象。
// subscribe也可以直接接受函数作为参数,
// 第一个参数如果是函数类型,就会被认为是next
// 第二参数被认为是error
// 第三个参数complete
source$.subscribe(
item => console.log(item),
err => console.log(err),
()=> console.log(' no more data')
)

Observable退订

// 返回一个对象,并且对象包含了unsubscribe函数,表示退订
const onSubscribe = observer => {
let number = 1;
const Timer = setInterval(()=>{
observer.next(number++)
},1000)
return {
unsubscribe:()=>{
clearInterval(Timer)
}
}
}
const source$ = new Observable(onSubscribe); // subscribe函数的返回结果存为变量subscription
const subscription = source$.subscribe(
item => console.log(item),
err => console.log(err),
()=> console.log(' no more data')
) // 3.5s后调用退订
// 3.5s后不再接受到被推送的数据,但是Observable的source$资源并没有终结
// 因为始终没有调用complete,只不过再也不会调用next函数了
setTimeout(()=>{
subscription.unsubscribe()
},3500)

修改以下代码,便于观察

// 返回一个对象,并且对象包含了unsubscribe函数,表示退订
const onSubscribe = observer => {
let number = 1;
const Timer = setInterval(()=>{
console.log('in onSUbscribe ',number)
observer.next(number++)
},1000)
return {
unsubscribe:()=>{
// clearInterval(Timer)
}
}
}

执行结果如下:

rxjs简单的Observable用例-LMLPHP

由此可见,Observable对象source$在退订以后依然在不断调用next函数,

但是已经断开了source$对象和Observer的连接。

所以onSubscribe中如何调用next,observer都不会做出任何响应

05-17 01:57