我是整个Rx和响应式编程的新手,但是我必须处理这种情况。我希望每隔500毫秒就可以观察到一个间隔,该间隔可以通过对REST API的POST请求来检查硬件状态,以查看响应是否发生了变化。因此,一旦更改,我希望立即关闭这种可观察间隔的POST请求,从而将资源留给以后的其他操作。这是一段代码。

myLoop(item_array:number[], otheroption : string){
    for (let item of item_array){
        //set hardware option, still a request
        this.configHardware(item,otheroption)
        //after config the hardware, command hardware take action
            .flatMap(() => {
                //this return a session id for check status
                this.takeHardwareAction()
                    .flatMap( (result_id) => {
                        //I want every 500ms check if hardware action is done or not
                        let actionCheckSubscription = IntervalObservable.create(500).subscribe(
                            () => {
                                //So my question is, can I unsubscribe the actionCheckSubscription in here on condition change? For example,
                                if (this.intervalCheckingStatus(result_id))
                                    actionCheckSubscription.unsubscribe() ;
                            }
                        ) ;
                    })

            })
    }
}

最佳答案

您可以使用Observable.fromconcatMap遍历所有项目,然后在验证通过filter后立即将take(1)filter组合使用以停止时间间隔:

myLoop(item_array:number[], otheroption : string) {
    return Observable.from(item_array)
        .concatMap(item => this.configHardware(item, otheroption)
            .switchMap(resultId => Observable.interval(500)
                .switchMapTo(this.intervalCheckingStatus(resultId))
                .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
                .take(1) // and complete after 1 value was valid
                .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
            )
        );
}

// usage:
myLoop([1,2,3,4], "fooBar")
    .subscribe(
        item => console.log(`Item ${item} is now valid`),
        error => console.error("Some error occured", error),
        () => console.log("All items are valid now")
    );




这是一个包含模拟数据的实时示例



const Observable = Rx.Observable;

function myLoop(item_array) {
    return Observable.from(item_array)
        // if you don't mind the execution-order you can use "mergeMap" instead of "concatMap"
        .concatMap(item => configHardwareMock(item)
            .switchMap(resultId => Observable.interval(500)
                .do(() => console.info("Checking status of: " + resultId))
                .switchMap(() => intervalCheckingStatus(resultId))
                .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
                .take(1) // and complete after 1 value was valid
                .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
            )
        );
}

// usage:
myLoop([1,2,3,4])
    .subscribe(
        item => console.log(`Item ${item} is now valid`),
        error => console.error("Some error occured", error),
        () => console.log("All items are valid now")
    );

// mock helpers
function configHardwareMock(id) {
  return Observable.of(id);
}

function intervalCheckingStatus(resultId) {
  if (Math.random() < .4) {
    return Observable.of(false);
  }

  return Observable.of(true);
}

<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

关于javascript - 如何取消订阅或处理Angular2或RxJS中可观察到的间隔?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42027804/

10-12 03:59
查看更多