我正在使用Spring WebFlux WebSocketClient订阅并处理来自远程Web套接字的消息。在处理期间,来自远程套接字的消息流量有时会意外地完成(或在错误时终止),从而导致执行Web套接字客户端的onComplete(或onError)回调。发生这种情况时,我的onCompleteonError回调将发布一个事件。事件侦听器通过调用创建另一个Web套接字客户端的函数进行响应,该客户端连接到相同的外部Web套接字,并且套接字处理重新开始。

我的问题是,在客户端完成处理后,我无法弄清楚如何释放WebSocketClient资源。这导致未使用的线程在JVM中累积。特别是第一个WebSocketClient在其上运行的线程(WebSocketClient-SecureIO-1WebSocketClient-SecureIO-2parallel-1)保持等待状态,并为新的“ WebSocketClient”启动新的线程。我以为在close()上调用WebSocketSession可以解决问题,但是不能。

我的实现模式是:

public void startProcessing() {
   WebSocketClient client = new StandardWebSocketClient();
   Mono<String> subscribeMsg = Mono.just("...");

   client
      .execute(uri, webSocketSession ->
          webSocketSession
             .send(subscribeMsg.map(webSocketSession::textMessage))
             .thenMany(webSocketSession.receive())
             .map(webSocketMessage -> ...)
             .buffer(Duration.ofSeconds(bufferDuration))
             .doOnNext(handler)
             .doOnComplete(() -> webSocketSession.close())
             .then())
       .subscribe(
            aVoid -> LOGGER.info("subscription started"),
            throwable -> {... publish restart event ...},
            () -> {... publish restart event ...});
}

public void restartEventListener() {
    startProcessing();
}


关于如何防止未使用的WebSocketClient线程在JVM中累积的任何建议?

最佳答案

一些想法:

WebSocketClient正在池化资源,因此您应将同一客户端重用于许多请求。

您应该避免在doOn*运算符内进行处理。这些是副作用运算符,在当前Scheduler上同步执行。为了提高效率,您应该使用其他运算符。您可以将websocket消息映射到Flux<DataBuffer>,然后使用DataBufferUtils::write将其写入文件,并且仍然利用相同的反应式管道,而不使用副作用运算符。

在其中之一中关闭websocket会话不是一个坏主意,尽管我会使用doOnTerminate,它在成功和错误情况下都会触发。

另外,我不了解发布事件以重新启动处理阶段的目标。使用retryrepeat运算符以及相同的客户端应该可以正常工作并提高效率。

09-10 09:41