问题描述
在简化的情况下,我想将WebSocket客户端发送的消息广播给所有其他客户端.该应用程序是使用Spring的反应式Websockets构建的.
In my simplified case I want to broadcast a message sent by WebSocket client to all other clients. The application is built using reactive websockets with Spring.
我的想法是使用单身Sink
,如果从客户端收到消息,则在此接收器上发出它. WebsocketSession::send
只是将此Sink
发出的事件转发到连接的客户端.
My idea was to use singleSink
and if a message is received from the client, emit it on this sink. WebsocketSession::send
just forwards events emitted by this Sink
to connected clients.
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })
return Mono.zip(input, output).then()
}
fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)
fun <T> fromJson(json : String, clazz : Class<T>) : T{
return objectMapper.readValue(json, clazz)
}
}
此实现并不安全,因为可以从不同的线程调用Sink.emitNext
.
This implementation is not safe as Sink.emitNext
can be called from different threads.
我的尝试是使用publishOn
并传递单线程的Scheduler
,以便从单个线程调用所有WebSocketSession
的onNext
.然而这是行不通的.从websocket客户端发出一项,然后所有后续的websocket客户端在连接后立即收到onClose事件:
My attempt was to use publishOn
and pass a singled threaded Scheduler
so that onNext
for all WebSocketSession
s is called from a single thread. Howeverthis does not work. One item is emitted from a websocket client and then all subsequent websocket clients receive onClose event immediately after connection :
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val scheduler = Schedulers.newSingle("sink-scheduler")
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.publishOn(scheduler) // publish on single threaded scheduler
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
...
}
}
我可以看到的另一种选择是在某个通用锁上使用synchronize
,以便发出线程安全:
Another option which I could see is to synchronize
on some common lock so that emission is thread safe :
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val lock = Any()
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
synchronized(lock) {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
}
.then()
...
}
}
但是我不确定是否应该那样做.
However I am not sure if this should be done like that.
在这种情况下是否可以使用publishOn
以便发射是线程安全的,如果不是,则该问题的其他解决方案是什么(除了像我对synchronized
关键字所做的那样使用同步之外).
Is it possible to use publishOn
in this case so that emission is thread safe and if not what is other solution to this problem (apart of using synchronization like I have done with synchronized
keyword).
推荐答案
除了使用synchronized
选项进行悲观锁定之外,您还可以创建一个与FAIL_FAST
相当的EmitFailureHandler
,但它会为EmitResult.NON_SERIALIZED_ACCESS
返回true
Instead of pessimistic locking with the synchronized
option, you could create an EmitFailureHandler
comparable to FAIL_FAST
except it returns true
for EmitResult.NON_SERIALIZED_ACCESS
.
这将导致并发的发射尝试立即重试,例如在繁忙的循环中.
This would result in the concurrent emit attempts to be immediately retried, like in a busy loop.
乐观地说,这将最终成功.如果您想对无限循环采取额外的防御措施,甚至可以使自定义处理程序引入延迟或限制其返回true
的次数.
Optimistically, this will end up succeeding. You can even make the custom handler introduce a delay or limit the number of times it returns true
if you want to be extra defensive against infinite loops.
这篇关于如何从Spring WebFlux中的多个Flux(WebsocketSession :: receive)正确向Sink发射值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!