使用 RxJS 对 WebSocket 流进行建模的方法有哪些。

我看到的最明显的东西是套接字流,它发出消息流。

如果我经常创建一个套接字流,我如何创建他们的消息流并仍然保留谁发送了这些消息?

套接字流是我的第一步:

const socket$ = Observable.create(({complete, next}) => {

  const server = new WebSocketServer({server: someHttpServer})

  server.on('connection', next)

  return () => {
    server.close()
    complete()
  }

})

但是消息流有点困难,因为我需要从中获取消息的套接字。

这是我对建模的第一次天真尝试:
const message$ = socket$.flatMap(socket => Observable.create(({complete, next}) => {

  socket.on('message', next)
  socket.on('close', complete)

  return () => socket.close()

})).share()

从所有套接字流式传输所有套接字消息的可观察对象。但是如果我订阅它,我就没有套接字了,这使得它是单向的。

我想
socket$ -> message$ -> server-processing -> socket$

但是响应有多种用例,广播、多播和单播。

最佳答案

我发现 flatMap 需要第二个函数,它接收 flatMap 的参数值和 flatMap 的(扁平化的)返回值。此函数可以返回一个新值,用于所有后续运算符。

const socketMessage$ = socket$.flatMap(

  socket => Observable.create(({complete, next}) => {

    socket.on('message', next)
    socket.on('close', complete)

    return () => socket.disconnect()

  }),

  (socket, message) => ({socket, message})

).share()

关于javascript - 使用 RxJS 对 WebSocket 流进行建模,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37378294/

10-12 19:33