WebFlux中的多个Flux

WebFlux中的多个Flux

本文介绍了如何从Spring WebFlux中的多个Flux(WebsocketSession :: receive)正确向Sink发射值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在简化的情况下,我想将WebSocket客户端发送的消息广播给所有其他客户端.该应用程序是使用Spring的反应式Websockets构建的.

In my simplified case I want to broadcast a message sent by WebSocket client to all other clients. The application is built using reactive websockets with Spring.

我的想法是使用单身Sink,如果从客户端收到消息,则在此接收器上发出它. WebsocketSession::send只是将此Sink发出的事件转发到连接的客户端.

My idea was to use singleSink and if a message is received from the client, emit it on this sink. WebsocketSession::send just forwards events emitted by this Sink to connected clients.

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })

        return Mono.zip(input, output).then()
    }

    fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)

    fun <T> fromJson(json : String, clazz : Class<T>) : T{
        return objectMapper.readValue(json, clazz)
    }

}

此实现并不安全,因为可以从不同的线程调用Sink.emitNext.

This implementation is not safe as Sink.emitNext can be called from different threads.

我的尝试是使用publishOn并传递单线程的Scheduler,以便从单个线程调用所有WebSocketSessiononNext.然而这是行不通的.从websocket客户端发出一项,然后所有后续的websocket客户端在连接后立即收到onClose事件:

My attempt was to use publishOn and pass a singled threaded Scheduler so that onNext for all WebSocketSessions is called from a single thread. Howeverthis does not work. One item is emitted from a websocket client and then all subsequent websocket clients receive onClose event immediately after connection :

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val scheduler = Schedulers.newSingle("sink-scheduler")

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .publishOn(scheduler) // publish on single threaded scheduler
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        ...
    }

}

我可以看到的另一种选择是在某个通用锁上使用synchronize,以便发出线程安全:

Another option which I could see is to synchronize on some common lock so that emission is thread safe :

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val lock = Any()

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    synchronized(lock) {
                        sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                    }
                }
                .then()
        ...
    }


}

但是我不确定是否应该那样做.

However I am not sure if this should be done like that.

在这种情况下是否可以使用publishOn以便发射是线程安全的,如果不是,则该问题的其他解决方案是什么(除了像我对synchronized关键字所做的那样使用同步之外).

Is it possible to use publishOn in this case so that emission is thread safe and if not what is other solution to this problem (apart of using synchronization like I have done with synchronized keyword).

推荐答案

除了使用synchronized选项进行悲观锁定之外,您还可以创建一个与FAIL_FAST相当的EmitFailureHandler,但它会为EmitResult.NON_SERIALIZED_ACCESS返回true

Instead of pessimistic locking with the synchronized option, you could create an EmitFailureHandler comparable to FAIL_FAST except it returns true for EmitResult.NON_SERIALIZED_ACCESS.

这将导致并发的发射尝试立即重试,例如在繁忙的循环中.

This would result in the concurrent emit attempts to be immediately retried, like in a busy loop.

乐观地说,这将最终成功.如果您想对无限循环采取额外的防御措施,甚至可以使自定义处理程序引入延迟或限制其返回true的次数.

Optimistically, this will end up succeeding. You can even make the custom handler introduce a delay or limit the number of times it returns true if you want to be extra defensive against infinite loops.

这篇关于如何从Spring WebFlux中的多个Flux(WebsocketSession :: receive)正确向Sink发射值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-24 09:39