我相当习惯 RX 在 .NET 和 Java 中使用它,我希望能够做到以下几点:

Rx.Observable.fromCallback(websocket.onmessage)
    .map(...)
    .subscribe(...);

但是,控制台具有以下内容:
Uncaught TypeError: Rx.Observable.fromCallback(websocket.onmessage).map is not a function
这似乎表明 fromCallback 没有返回 Observable。

我在这里做错了什么?我是否误解了 fromCallback 正在做什么,我需要使用 Subject 吗?我不能在 observable 中包装一些任意的处理程序吗?

最佳答案

您实际上是在寻找 fromEvent fromEventPattern :

Rx.Observable.fromEvent(websocket, 'message').map(/*...*/).subscribe();

Rx.Observable.fromEventPattern(
  function add(h) { websocket.addEventListener(h); },
  function remove(h) { websocket.removeEventListener(h); })
 .map(/*...*/)
 .subscribe();

第一个将尝试使用订阅事件发射器的一些标准方式,WebSocket 就是这样。但是,如果失败,您可以使用 fromEventPattern 来指定如何从对象中添加或删除处理程序。

另一个注意事项,JavaScript 不会像 C# 和 Java 那样传递对您正在使用的对象实例的隐式引用,因此您的代码 fromCallback(websocket.onmessage) 不会传递 websocket ,而是传递对函数原型(prototype)中方法的引用. this 将在执行时确定。
Rx.Observable.fromCallback 用于最后一个参数是回调函数的函数,这是异步 JavaScript 代码的标准模式。此外,fromCallback 方法不返回 Observable 它返回一个函数,当调用时返回一个 Observable
function methodWithCallback(arg0, arg1, cb) {
  setTimeout(function() {
    cb(arg0 + arg1);
  }, 2000);
}

var newMethod = Rx.Observable.fromCallback(methodWithCallback);

//[After 2 seconds] 3
newMethod(1, 2).subscribe(console.log.bind(console));

10-06 03:17