问题描述
我正在使用 Observable
从全局资源为客户端提供事件订阅接口,我需要根据活动订阅的数量来管理该资源:
- 当订阅数大于 0 时分配全局资源
- 订阅数变为0时释放全局资源
- 根据订阅数调整资源使用策略
RXJS 中监控活跃订阅数的正确方法是什么?
如何在 RXJS 语法中实现以下内容?-
const myEvent: Observable= new Observable();myEvent.onSubscription((newCount: number, prevCount: number) => {如果(新计数 === 0){//释放全局资源} 别的 {//分配全局资源,如果还没有分配}//对于可扩展的资源使用/负载,//重新配置它,基于newCount});
我不希望每次更改都有保证的通知,因此 newCount
+ prevCount
参数.
UPDATE-1
这与此不重复,因为我需要在何时收到通知订阅数量会发生变化,而不仅仅是为了在某个时间点获取计数器.
UPDATE-2
到目前为止没有任何答案,我很快想出了一个非常丑陋且有限的解决方法,通过完全封装,专门针对Subject
类型.非常希望找到合适的解决方案.
UPDATE-3
几个答案后,我仍然不确定如何实现我正在尝试的内容,如下所示:
class CustomType {}CountedObservable 类扩展 Observable{私信:字符串;//随机属性公开 onCount;//需要实现的神奇 Observable构造函数(消息:字符串){//极好的();???this.message = 消息;}//随机方法公共 getMessage() {返回 this.message;}}const a = new CountedObservable('hello');//可以直接创建const msg = a.getMessage();//可以调用方法a.subscribe((data: CustomType) => {//在这里处理订阅;});//需要实现 onCount 的魔法,所以我可以这样做:a.onCount.subscribe((newCount: number, prevCont: number) => {//管理一些外部资源});
如何实现上面这样的 CountedObservable
类,它可以让我订阅它自己,以及它的 onCount
属性来监控它的客户端/订阅的数量?
UPDATE-4
所有建议的解决方案似乎都过于复杂,即使我接受了其中一个答案,我最终还是得到了一个 我自己的完全自定义解决方案之一.
你可以使用 defer 跟踪订阅和 finalize 跟踪完成情况,例如作为运营商:
//一个自定义操作符,用于计算订阅者的数量函数 customOperator(onCountUpdate = noop) {返回函数 refCountOperatorFunction(source$) {让计数器 = 0;返回延迟(()=>{计数器++;onCountUpdate(计数器);返回来源$;}).管道(完成(()=>{柜台 - ;onCountUpdate(计数器);}));};}//只是一个 `onCountUpdate` 的存根函数 noop(){}
然后像这样使用它:
const source$ = new Subject();const 结果$ = source$.pipe(customOperator( n => console.log('更新计数:', n) ));
这是一个说明这一点的代码片段:
const { Subject, of, timer, pipe, defer } = rxjs;const { finalize, takeUntil } = rxjs.operators;const source$ = new Subject();const 结果$ = source$.pipe(customOperator( n => console.log('更新计数:', n) ));//发出事件setTimeout(()=>{源$.next('one');}, 250);setTimeout(()=>{源$.next('二');}, 1000);setTimeout(()=>{源$.next('三');}, 1250);setTimeout(()=>{源$.next('四');}, 1750);//订阅和取消订阅const 订阅A = 结果$.subscribe(value => console.log('A', value));setTimeout(()=>{结果$.subscribe(value => console.log('B', value));}, 500);setTimeout(()=>{结果$.subscribe(value => console.log('C', value));}, 1000);setTimeout(()=>{subscriptionA.unsubscribe();}, 1500);//完整源码setTimeout(()=>{源$.complete();}, 2000);函数 customOperator(onCountUpdate = noop) {返回函数 refCountOperatorFunction(source$) {让计数器 = 0;返回延迟(()=>{计数器++;onCountUpdate(计数器);返回来源$;}).管道(完成(()=>{柜台 - ;onCountUpdate(计数器);}));};}函数 noop(){}
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>
* 注意:如果你的 source$ 很冷——你可能需要分享它.>
希望能帮到你
I'm using an Observable
to provide event subscription interface for clients from a global resource, and I need to manage that resource according to the number of active subscriptions:
- Allocate global resource when the number of subscriptions becomes greater than 0
- Release global resource when the number of subscriptions becomes 0
- Adjust the resource usage strategy based on the number of subscriptions
What is the proper way in RXJS to monitor the number of active subscriptions?
How to implement the following within RXJS syntax? -
const myEvent: Observable<any> = new Observable();
myEvent.onSubscription((newCount: number, prevCount: number) => {
if(newCount === 0) {
// release global resource
} else {
// allocate global resource, if not yet allocated
}
// for a scalable resource usage / load,
// re-configure it, based on newCount
});
I wouldn't expect a guaranteed notification on each change, hence newCount
+ prevCount
params.
UPDATE-1
This is not a duplicate to this, because I need to be notified when the number of subscriptions changes, and not just to get the counter at some point.
UPDATE-2
Without any answer so far, I quickly came up with a very ugly and limited work-around, through complete incapsulation, and specifically for type Subject
. Hoping very much to find a proper solution.
UPDATE-3
After a few answers, I'm still not sure how to implement what I'm trying, which is the following:
class CustomType {
}
class CountedObservable<T> extends Observable<T> {
private message: string; // random property
public onCount; // magical Observable that needs to be implemented
constructor(message: string) {
// super(); ???
this.message = message;
}
// random method
public getMessage() {
return this.message;
}
}
const a = new CountedObservable<CustomType>('hello'); // can create directly
const msg = a.getMessage(); // can call methods
a.subscribe((data: CustomType) => {
// handle subscriptions here;
});
// need that magic onCount implemented, so I can do this:
a.onCount.subscribe((newCount: number, prevCont: number) => {
// manage some external resources
});
How to implement such class CountedObservable
above, which would let me subscribe to itself, as well as its onCount
property to monitor the number of its clients/subscriptions?
UPDATE-4
All suggested solutions seemed overly complex, and even though I accepted one of the answers, I ended up with a completely custom solution one of my own.
You could achieve it using defer to track subscriptions and finalize to track completions, e.g. as an operator:
// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;
return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}
// just a stub for `onCountUpdate`
function noop(){}
And then use it like:
const source$ = new Subject();
const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
Heres a code snippet illustrating this:
const { Subject, of, timer, pipe, defer } = rxjs;
const { finalize, takeUntil } = rxjs.operators;
const source$ = new Subject();
const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
// emit events
setTimeout(()=>{
source$.next('one');
}, 250);
setTimeout(()=>{
source$.next('two');
}, 1000);
setTimeout(()=>{
source$.next('three');
}, 1250);
setTimeout(()=>{
source$.next('four');
}, 1750);
// subscribe and unsubscribe
const subscriptionA = result$
.subscribe(value => console.log('A', value));
setTimeout(()=>{
result$.subscribe(value => console.log('B', value));
}, 500);
setTimeout(()=>{
result$.subscribe(value => console.log('C', value));
}, 1000);
setTimeout(()=>{
subscriptionA.unsubscribe();
}, 1500);
// complete source
setTimeout(()=>{
source$.complete();
}, 2000);
function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;
return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}
function noop(){}
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>
* NOTE: if your source$ is cold — you might need to share it.
Hope it helps
这篇关于如何监控RXJS订阅的数量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!