问题描述
在我的场景中,客户端发送再见"websocket 消息,我需要关闭之前在服务器端建立的连接.
In my scenario, a client sends "goodbye" websocket message and I need to close previously established connection at the server side.
来自 akka-http 文档:
From akka-http docs:
通过取消来自服务器逻辑的传入连接流(例如,通过将其下游连接到 Sink.cancelled 并将其上游连接到 Source.empty),可以关闭连接.也可以通过取消 IncomingConnection 源连接来关闭服务器的套接字.
但考虑到 Sink
和 Source
在协商新连接时设置一次,我不清楚如何做到这一点:
But it's not clear to me how to do that taking into account that Sink
and Source
are set once when negotiating a new connection:
(get & path("ws")) {
optionalHeaderValueByType[UpgradeToWebsocket]() {
case Some(upgrade) ⇒
val connectionId = UUID()
complete(upgrade.handleMessagesWithSinkSource(sink, source))
case None ⇒
reject(ExpectedWebsocketRequestRejection)
}
}
推荐答案
提示:此答案基于 akka-stream-experimental
版本 2.0-M2
.API在其他版本中可能略有不同.
HINT: This answer is based on akka-stream-experimental
version 2.0-M2
. The API may be slightly different in other versions.
关闭连接的一种简单方法是使用 PushStage
:
An easy way to close the connection is by using a PushStage
:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
在客户端或服务器端接收的每个元素(以及通常通过 Flow
的每个元素)都通过这样的 Stage
组件.在 Akka 中,完整的抽象被称为 GraphStage
,更多信息可以在 官方文档.
Every element that is received at the client side or at the server side (and in general every element that goes through a Flow
) goes through such a Stage
component. In Akka, the full abstraction is called GraphStage
, more information can be found in the official documentation.
使用 PushStage
,我们可以查看具体传入元素的值,然后相应地转换上下文.在上面的例子中,一旦收到 goodbye
消息,我们就完成上下文,否则我们只是通过 push
方法转发值.
With a PushStage
we can watch concrete incoming elements for their value and than transform the context accordingly. In the example above, once the goodbye
message is received we finish the context otherwise we just forward the value through the push
method.
现在,我们可以通过 transform
方法将 closeClient
组件连接到任意流:
Now, we can connect the closeClient
component to an arbitrary flow through the transform
method:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("
"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "
")
.map(ByteString(_))
connection.join(flow).run()
上面的流程接收到一个ByteString
并返回一个ByteString
,这意味着它可以通过joinconnection
/代码>方法.在流程内部,我们首先将字节转换为字符串,然后再将它们发送到 closeClient
.如果 PushStage
没有完成流,元素将在流中转发,在那里它被丢弃并替换为来自 stdin 的一些输入,然后通过线路发回.如果流完成,则舞台组件之后的所有进一步流处理步骤都将被删除 - 流现在已关闭.
The flow above receives a ByteString
and returns a ByteString
, which means it can be connected to connection
through the join
method. Inside of the flow we first convert the bytes to a string before we send them to closeClient
. If the PushStage
doesn't finish the stream, the element is forwarded in the stream, where it gets dropped and replaced by some input from stdin, which is then sent back over the wire. In case the stream is finished, all further stream processing steps after the stage component will be dropped - the stream is now closed.
这篇关于从服务器关闭 akka-http websocket 连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!