使用 RxJS 对 WebSocket 流进行建模的方法有哪些。
我看到的最明显的东西是套接字流,它发出消息流。
如果我经常创建一个套接字流,我如何创建他们的消息流并仍然保留谁发送了这些消息?
套接字流是我的第一步:
const socket$ = Observable.create(({complete, next}) => {
const server = new WebSocketServer({server: someHttpServer})
server.on('connection', next)
return () => {
server.close()
complete()
}
})
但是消息流有点困难,因为我需要从中获取消息的套接字。
这是我对建模的第一次天真尝试:
const message$ = socket$.flatMap(socket => Observable.create(({complete, next}) => {
socket.on('message', next)
socket.on('close', complete)
return () => socket.close()
})).share()
从所有套接字流式传输所有套接字消息的可观察对象。但是如果我订阅它,我就没有套接字了,这使得它是单向的。
我想
socket$ -> message$ -> server-processing -> socket$
但是响应有多种用例,广播、多播和单播。
最佳答案
我发现 flatMap
需要第二个函数,它接收 flatMap
的参数值和 flatMap
的(扁平化的)返回值。此函数可以返回一个新值,用于所有后续运算符。
const socketMessage$ = socket$.flatMap(
socket => Observable.create(({complete, next}) => {
socket.on('message', next)
socket.on('close', complete)
return () => socket.disconnect()
}),
(socket, message) => ({socket, message})
).share()
关于javascript - 使用 RxJS 对 WebSocket 流进行建模,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37378294/