问题描述
我有一个事件发射器,它以50Hz的频率发送事件.我想使用异步方法订阅此发射器.该代码如下所示:
I have an event emitter that sends events at 50Hz.I'd like to subscribe to this emitter with an async method. The code looks like the following:
this.emitter = fromEventPattern(this.addHandler, this.removeHandler, (err, char) => [err, char]);
this.rxSubscription = this.emitter.subscribe(this.handleUpdatedValuesComingFromSensor);
和
handleUpdatedValuesComingFromSensor = async (arr: any[]): Promise<void> => {
...
await someMethodAsync();
...
}
我可能错了,但是我的印象是,在那儿等待使发射器立即退出onNext(),因为我已经退出了该方法.
I maybe wrong but I'm under the impression that awaiting in there makes the emitter calls onNext() immediately because I've exited the method.
由于事件发生率,使用控制台调用进行调试非常困难.
This is very difficult to debug with console calls because of the event rate.
我是对还是错?
感谢您的帮助.
编辑1 :
我正在使用面向ES2015的打字稿,因此为异步/等待生成了状态机.
I'm using typescript targetting ES2015 so a state machine is generated for async/await.
如果我是对的,如何确保电话不会重叠?我需要计算收到的值的平均值.
If I'm right, how can I ensure that calls do not overlap? I need to compute averages on values I receive.
推荐答案
您是正确的. Rx忽略其订阅函数的返回类型,因此当它遇到第一个await
时,它将忽略从async
函数返回的promise.这意味着:
You are correct. Rx ignores the return types of its subscription functions, so it ignores the promise returned from your async
function when it hits its first await
. This means:
- 一旦其他项目到达可观察对象,Rx将再次调用您的订阅功能.它忽略了返回的承诺,因此它不知道旧的调用仍在进行中.
-
async
函数中的异常将被忽略,因为promise被忽略了.一些承诺库具有可处理此问题的全局未观察到的承诺错误"事件.
- As soon as another item arrives on the observable, Rx will invoke your subscription function again. It ignored the promise that was returned, so it doesn't know the old invocation is still in progress.
- Exceptions from your
async
function will be ignored, since the promise was ignored. Some promise libraries have a global "unobserved promise error" event that can handle this.
这篇关于使用异步功能订阅可观察对象是否安全的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!