我正在尝试创建一个Kotlin服务,该服务能够将数据从Kafka传递到(特定的)WebSocket连接。
例如,如果来自用户的数据在Kafka上传递,则程序会接收到该数据,如果当前已连接该用户,我想将其传递给正确的WebSocket连接。

我拥有的基本知识是:

@KafkaListener()
@Controller
@Secured(SecurityRule.IS_ANONYMOUS)
@ServerWebSocket("/ws/{id}")
class WebSocket() {

    @OnOpen
    fun onOpen(session: WebSocketSession, id: String): Publisher<String> {
        return session.send("connection opened")
    }
}


但是,如果在同一个班级中我收到一个Kafka消息,其中包含我要查找的用户ID,如何将其传递给正确的WebSocket连接?

我想出了如果我从URL中使用"/ws/{id}"可以轻松地将其发送到正确的ID,但是我不知道如何实现。

我知道将数据发送到连接的唯一方法是从websocket注释的函数(例如OnOpen和OnMessage等)内部。
此外,我发现Micronaut WebSocketBroadcaster很有用,但仅用于广播。

我想找出的另一点是在哪里确保连接到的ID也是用户的实际ID,而不是其他人的实际ID,我应该在@Secured注释中实现它吗?

最佳答案

对于任何有兴趣的人,我的确找到了解决方案。
使用Micronaut WebSocketBroadcaster,您可以广播消息,并使用第二个参数,该参数采用Predicate<WebSocketSession>,它定义了将消息发送到的WebSocketSessions。

如果您确实知道特定的WebSocketSession ID,则可以简单地将{ it.id == "id" }用作谓词,如示例1所示。
如果确实需要根据其他一些属性将其发送给特定用户,则可以使用oncc函数中的session.attributes.put("id", id)将属性分配给WebSocketSession本身。可以在示例2中使用的谓词是,关于此的唯一事情是,在没有将其强制转换为地图的情况下,我找不到一种检索属性的方法,这不是很漂亮。

class ExampleClass(val broadcaster: WebSocketBroadcaster) {
    val userId = "user_001"
    val webSocketId = "asdasdasd"

    // [1] Send message to a specific socket ID
    broadcaster.broadcast("message", { it.id == webSocketId)

    // [2] Send message to a user with specific ID or any other identifying attribute of choice
    broadcaster.broadcast("message", { it.attributes.asMap()["id"]?.equals(id) ?: false})
}

10-06 13:31
查看更多