我是整个Rx和响应式编程的新手,但是我必须处理这种情况。我希望每隔500毫秒就可以观察到一个间隔,该间隔可以通过对REST API的POST请求来检查硬件状态,以查看响应是否发生了变化。因此,一旦更改,我希望立即关闭这种可观察间隔的POST请求,从而将资源留给以后的其他操作。这是一段代码。
myLoop(item_array:number[], otheroption : string){
for (let item of item_array){
//set hardware option, still a request
this.configHardware(item,otheroption)
//after config the hardware, command hardware take action
.flatMap(() => {
//this return a session id for check status
this.takeHardwareAction()
.flatMap( (result_id) => {
//I want every 500ms check if hardware action is done or not
let actionCheckSubscription = IntervalObservable.create(500).subscribe(
() => {
//So my question is, can I unsubscribe the actionCheckSubscription in here on condition change? For example,
if (this.intervalCheckingStatus(result_id))
actionCheckSubscription.unsubscribe() ;
}
) ;
})
})
}
}
最佳答案
您可以使用Observable.from
和concatMap
遍历所有项目,然后在验证通过filter
后立即将take(1)
与filter
组合使用以停止时间间隔:
myLoop(item_array:number[], otheroption : string) {
return Observable.from(item_array)
.concatMap(item => this.configHardware(item, otheroption)
.switchMap(resultId => Observable.interval(500)
.switchMapTo(this.intervalCheckingStatus(resultId))
.filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
.take(1) // and complete after 1 value was valid
.mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
)
);
}
// usage:
myLoop([1,2,3,4], "fooBar")
.subscribe(
item => console.log(`Item ${item} is now valid`),
error => console.error("Some error occured", error),
() => console.log("All items are valid now")
);
这是一个包含模拟数据的实时示例
const Observable = Rx.Observable;
function myLoop(item_array) {
return Observable.from(item_array)
// if you don't mind the execution-order you can use "mergeMap" instead of "concatMap"
.concatMap(item => configHardwareMock(item)
.switchMap(resultId => Observable.interval(500)
.do(() => console.info("Checking status of: " + resultId))
.switchMap(() => intervalCheckingStatus(resultId))
.filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
.take(1) // and complete after 1 value was valid
.mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
)
);
}
// usage:
myLoop([1,2,3,4])
.subscribe(
item => console.log(`Item ${item} is now valid`),
error => console.error("Some error occured", error),
() => console.log("All items are valid now")
);
// mock helpers
function configHardwareMock(id) {
return Observable.of(id);
}
function intervalCheckingStatus(resultId) {
if (Math.random() < .4) {
return Observable.of(false);
}
return Observable.of(true);
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
关于javascript - 如何取消订阅或处理Angular2或RxJS中可观察到的间隔?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42027804/