本文介绍了如何使用 rxjs 定期检查实时连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 rxjs 来处理 websocket 连接

I use rxjs to handle a websocket connection

var socket = Rx.Observable.webSocket('wss://echo.websocket.org')
socket.resultSelector = (e) => e.data

我想定期 (5s) 发送一个 ping 消息并等待 3s 以接收 pong 响应并订阅 a 流,如果没有收到响应.

I want to periodically (5s) sent a ping message and wait 3s to receive a pong response and subscribe to the a stream if no response has been receive.

我尝试过,但没有成功.我承认我对所有可用于处理超时、去抖动或节流的操作员感到有点迷茫.

I try that without success. I admit I'm a bit lost will all the operator available to handle timeout, deboune or throttle.

// periodically send a ping message
const ping$ = Rx.Observable.interval(2000)
  .timeInterval()
  .do(() => socket.next('ping'))

const pong$ = socket
  .filter(m => /^ping$/.test(`${m}`))
  .mergeMap(
    ping$.throttle(2000).map(() => Observable.throw('pong timeout'))
  )

 pong$.subscribe(
   (msg) => console.log(`end ${msg}`),
   (err) => console.log(`err ${err}`),
   () => console.log(`complete`)
 )

但不幸的是,没有发送 ping.

But unfortunately, no ping are send.

我也尝试过使用但没有成功.

I've also try to achieved that using without success.

const ping$ = Rx.Observable.interval(2000)
  .timeInterval()
  .do(() => socket.next('ping'))


const pong$ = socket
  .filter(m => /^ping$/.test(`${m}`))

const heartbeat$ = ping$
  .debounceTime(5000)
  .mergeMap(() => Rx.Observable.timer(5000).takeUntil(pong$))

heartbeat$.subscribe(
  (msg) => console.log(`end ${msg}`),
  (err) => console.log(`err ${err}`),
  () => console.log(`complete`)
)

任何帮助表示赞赏.

推荐答案

您可以使用 race() 运算符始终只连接到最先发出的 Observable:

You can use race() operator to always connect only to the Observable that emits first:

function sendMockPing() {
  // random 0 - 5s delay
  return Observable.of('pong').delay(Math.random() * 10000 / 2);
}

Observable.timer(0, 5000)
  .map(i => 'ping')
  .concatMap(val => {
    return Observable.race(
      Observable.of('timeout').delay(3000),
      sendMockPing()
    );
  })
  //.filter(response => response === 'timeout') // remove all successful responses
  .subscribe(val => console.log(val));

查看现场演示:https://jsbin.com/lavinah/6/edit?js,控制台

这随机模拟响应时间为 0 - 5s.当响应时间超过 3 秒时 Observable.of('timeout').delay(3000) 首先完成并且 timeout 字符串被 传递给它的观察者concatMap().

This randomly simulates response taking 0 - 5s. When the response takes more than 3s than Observable.of('timeout').delay(3000) completes first and the timeout string is passed to its observer by concatMap().

这篇关于如何使用 rxjs 定期检查实时连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-01 19:40