问题描述
下面的代码成功建立了一个websocket连接.
websockets 服务器(也是 akk-http)使用 Andrew 在此处建议的答案故意关闭连接.
下面的 SinkActor
收到一条 akka.actor.Status.Failure
类型的消息,所以我知道从服务器到客户端的消息流已经中断.>
我的问题是...我的客户端应该如何重新建立 websocket 连接?source.via(webSocketFlow).to(sink).run()
完成了吗?
清理资源和重试 websocket 连接的最佳实践是什么?
class ConnectionAdminActor 使用 ActorLogging { 扩展 Actor隐式 val 系统:ActorSystem = context.system隐式 val flowMaterializer = ActorMaterializer()private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {参考 =>{自己 !StartupWithActor(ref.path)参考}}私有 val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))来源.via(webSocketFlow).下沉).跑()
尝试 recoverWithRetries
组合器 (docs 此处).
这允许您提供替代的 Source
您的管道将切换到,以防上游失败.在最简单的情况下,您可以重复使用相同的 Source
,它应该会发出一个新的连接.
val wsSource = source via webSocketFlow来源.recoverWithRetries(attempts = -1, {case e: Throwable => wsSource}).下沉)
注意
attempts = -1
将无限重试重新连接- 部分函数允许更精细地控制哪些异常可以触发重新连接
The code below successfully establishes a websocket connection.
The websockets server (also akk-http) deliberately closes the connection using Andrew's suggested answer here.
The SinkActor
below receives a message of type akka.actor.Status.Failure
so I know that the flow of messages from Server to Client has been disrupted.
My question is ... How should my client reestablish the websocket connection? Has source.via(webSocketFlow).to(sink).run()
completed?
What is best practice for cleaning up the resources and retrying the websocket connection?
class ConnectionAdminActor extends Actor with ActorLogging {
implicit val system: ActorSystem = context.system
implicit val flowMaterializer = ActorMaterializer()
private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")
private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)
private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {
ref => {
self ! StartupWithActor(ref.path)
ref
}
}
private val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))
source
.via(webSocketFlow)
.to(sink)
.run()
Try the recoverWithRetries
combinator (docs here).
This allows you to provide an alternative Source
your pipeline will switch to, in case the upstream has failed. In the most simple case, you can just re-use the same Source
, which should issue a new connection.
val wsSource = source via webSocketFlow
wsSource
.recoverWithRetries(attempts = -1, {case e: Throwable => wsSource})
.to(sink)
Note that
- the
attempts = -1
will retry to reconnect indefinetely - the partial function allows for more granular control over which exception can trigger a reconnect
这篇关于如何在断开连接后清理 akka-http websocket 资源然后重试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!