我正在使用Spring WebFlux WebSocketClient
订阅并处理来自远程Web套接字的消息。在处理期间,来自远程套接字的消息流量有时会意外地完成(或在错误时终止),从而导致执行Web套接字客户端的onComplete
(或onError
)回调。发生这种情况时,我的onComplete
和onError
回调将发布一个事件。事件侦听器通过调用创建另一个Web套接字客户端的函数进行响应,该客户端连接到相同的外部Web套接字,并且套接字处理重新开始。
我的问题是,在客户端完成处理后,我无法弄清楚如何释放WebSocketClient
资源。这导致未使用的线程在JVM中累积。特别是第一个WebSocketClient
在其上运行的线程(WebSocketClient-SecureIO-1
,WebSocketClient-SecureIO-2
和parallel-1
)保持等待状态,并为新的“ WebSocketClient”启动新的线程。我以为在close()
上调用WebSocketSession
可以解决问题,但是不能。
我的实现模式是:
public void startProcessing() {
WebSocketClient client = new StandardWebSocketClient();
Mono<String> subscribeMsg = Mono.just("...");
client
.execute(uri, webSocketSession ->
webSocketSession
.send(subscribeMsg.map(webSocketSession::textMessage))
.thenMany(webSocketSession.receive())
.map(webSocketMessage -> ...)
.buffer(Duration.ofSeconds(bufferDuration))
.doOnNext(handler)
.doOnComplete(() -> webSocketSession.close())
.then())
.subscribe(
aVoid -> LOGGER.info("subscription started"),
throwable -> {... publish restart event ...},
() -> {... publish restart event ...});
}
public void restartEventListener() {
startProcessing();
}
关于如何防止未使用的
WebSocketClient
线程在JVM中累积的任何建议? 最佳答案
一些想法:WebSocketClient
正在池化资源,因此您应将同一客户端重用于许多请求。
您应该避免在doOn*
运算符内进行处理。这些是副作用运算符,在当前Scheduler
上同步执行。为了提高效率,您应该使用其他运算符。您可以将websocket消息映射到Flux<DataBuffer>
,然后使用DataBufferUtils::write
将其写入文件,并且仍然利用相同的反应式管道,而不使用副作用运算符。
在其中之一中关闭websocket会话不是一个坏主意,尽管我会使用doOnTerminate
,它在成功和错误情况下都会触发。
另外,我不了解发布事件以重新启动处理阶段的目标。使用retry
和repeat
运算符以及相同的客户端应该可以正常工作并提高效率。