问题描述
我使用 rxjs 来处理 websocket 连接
I use rxjs to handle a websocket connection
var socket = Rx.Observable.webSocket('wss://echo.websocket.org')
socket.resultSelector = (e) => e.data
我想定期 (5s) 发送一个 ping
消息并等待 3s 以接收 pong
响应并订阅 a 流,如果没有收到响应.
I want to periodically (5s) sent a ping
message and wait 3s to receive a pong
response and subscribe to the a stream if no response has been receive.
我尝试过,但没有成功.我承认我对所有可用于处理超时、去抖动或节流的操作员感到有点迷茫.
I try that without success. I admit I'm a bit lost will all the operator available to handle timeout, deboune or throttle.
// periodically send a ping message
const ping$ = Rx.Observable.interval(2000)
.timeInterval()
.do(() => socket.next('ping'))
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
.mergeMap(
ping$.throttle(2000).map(() => Observable.throw('pong timeout'))
)
pong$.subscribe(
(msg) => console.log(`end ${msg}`),
(err) => console.log(`err ${err}`),
() => console.log(`complete`)
)
但不幸的是,没有发送 ping.
But unfortunately, no ping are send.
我也尝试过使用但没有成功.
I've also try to achieved that using without success.
const ping$ = Rx.Observable.interval(2000)
.timeInterval()
.do(() => socket.next('ping'))
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
const heartbeat$ = ping$
.debounceTime(5000)
.mergeMap(() => Rx.Observable.timer(5000).takeUntil(pong$))
heartbeat$.subscribe(
(msg) => console.log(`end ${msg}`),
(err) => console.log(`err ${err}`),
() => console.log(`complete`)
)
任何帮助表示赞赏.
推荐答案
您可以使用 race()
运算符始终只连接到最先发出的 Observable:
You can use race()
operator to always connect only to the Observable that emits first:
function sendMockPing() {
// random 0 - 5s delay
return Observable.of('pong').delay(Math.random() * 10000 / 2);
}
Observable.timer(0, 5000)
.map(i => 'ping')
.concatMap(val => {
return Observable.race(
Observable.of('timeout').delay(3000),
sendMockPing()
);
})
//.filter(response => response === 'timeout') // remove all successful responses
.subscribe(val => console.log(val));
查看现场演示:https://jsbin.com/lavinah/6/edit?js,控制台
这随机模拟响应时间为 0 - 5s
.当响应时间超过 3 秒时 Observable.of('timeout').delay(3000)
首先完成并且 timeout
字符串被 传递给它的观察者concatMap()
.
This randomly simulates response taking 0 - 5s
. When the response takes more than 3s than Observable.of('timeout').delay(3000)
completes first and the timeout
string is passed to its observer by concatMap()
.
这篇关于如何使用 rxjs 定期检查实时连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!