我试图用rxjs的可观察性来包装一个基于事件的api,但是我不知道如何在取消订阅后清理自己。
这就是我将订阅与Observable和基于事件的API同步的方式:

interface SomeEventService{
    registerListener(messageName: string, messageContent: Something): Listener;
    unregisterListener(listener: Listener);
}

class MyWrapper {
    private eventService: SomeEventService;

    streamOfSomething$(): Observable<Something> {
        return Observable.create((observer: Observer<Something>) => {
            // Create a listener, and relay events to the Observable
            const listener = this.eventService.registerListener("something", something => {
                observer.next(something);
            })
            // Now what?
        })
    }
}

这里的问题是,在将来的某个时候,我想调用eventService.unregisterListener,例如当streamOfSomething$的消费者取消订阅流时。
问题是:我该怎么做?据我所知,在发生取消订阅后,没有可以在observer上使用的事件或回调来运行代码。

最佳答案

class MyWrapper {
    private eventService: SomeEventService;

    streamOfSomething$(): Observable<Something> {
        return Observable.create((observer: Observer<Something>) => {
            // Create a listener, and relay events to the Observable
            const listener = this.eventService.registerListener("something", something => {
                observer.next(something);
            })
            // Now what?
             return () => { this.eventService.unRegisterListener(); };
        })
    }
}

当用户调用unsubscribe()时,将调用您将返回的函数

10-07 14:50