TL; DR;

我们正在尝试使用spring webflux WebSocket实现设计WebSocket服务器。该服务器具有常规的HTTP服务器操作,例如create/fetch/update/fetchall。使用WebSocket,我们试图公开一个端点,以便客户端可以将单个连接用于所有类型的操作,因为WebSocket就是为此目的而设计的。使用webflux和WebSockets是否是正确的设计?

长版

我们正在启动一个项目,该项目将使用spring-webflux中的响应式(Reactive)Web套接字。我们需要构建一个响应式(Reactive)客户端库,供消费者使用以连接到服务器。

在服务器上,我们得到一个请求,阅读一条消息,保存并返回静态响应:

public Mono<Void> handle(WebSocketSession webSocketSession) {
    Flux<WebSocketMessage> response = webSocketSession.receive()
            .map(WebSocketMessage::retain)
            .concatMap(webSocketMessage -> Mono.just(webSocketMessage)
                    .map(parseBinaryToEvent) //logic to get domain object
                    .flatMap(e -> service.save(e))
                    .thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
            );

    return webSocketSession.send(response);
}

在客户端上,我们希望在有人调用save方法并从server返回响应时进行 call 。
public Mono<String> save(Event message) {
    new ReactorNettyWebSocketClient().execute(uri, session -> {
      session
              .send(Mono.just(session.binaryMessage(formatEventToMessage)))
              .then(session.receive()
                      .map(WebSocketMessage::getPayloadAsText)
                      .doOnNext(System.out::println).then()); //how to return this to client
    });
    return null;
}

我们不确定如何进行此设计。理想情况下,我们认为应该

1)client.execute仅应调用一次,并以某种方式保留session。应该在随后的调用中使用相同的 session 来发送数据。

2)如何从我们从session.receive中获得的服务器返回响应?

3)如果在fetch中响应很大(不仅是静态字符串而是事件列表)时,使用session.receive怎么办?

我们正在做一些研究,但我们找不到在线的webflux-websocket-client文档/实现适当的资源。关于如何前进的任何指示。

最佳答案

请!使用RSocket!

这是绝对正确的设计,值得节省资源,并且每个客户端仅对所有可能的操作使用一个连接。

但是,不要实现转轮,而要使用可以为您提供所有此类通信的协议(protocol)。

  • RSocket具有一个请求-响应模型,该模型允许您执行当今最常见的客户端-服务器交互。
  • RSocket具有请求流通信模型,因此您可以满足您的所有需求,并异步重用同一连接以返回事件流。 RSocket可以将逻辑流映射到物理连接并进行映射,因此您自己不会感到痛苦。
  • RSocket具有更多的交互模型,例如
    抛弃式的和流式的流在以下情况下可能有用
    通过两种方式发送数据流。

  • 如何在Spring中使用RSocket

    一种选择是使用RSocket协议(protocol)的RSocket-Java实现。 RSocket-Java构建在Project Reactor之上,因此自然适合Spring WebFlux生态系统。

    不幸的是,没有与Spring生态系统的功能集成。幸运的是,我花了几个小时来提供一个简单的RSocket Spring Boot Starter,它可以将Spring WebFlux与RSocket集成在一起,并将WebSocket RSocket服务器与WebFlux Http服务器一起公开。

    为什么RSocket是更好的方法?

    基本上,RSocket隐藏了自己实现相同方法的复杂性。使用RSocket,我们不必将交互模型定义作为自定义协议(protocol)和Java实现来考虑。 RSocket为我们完成了将数据传送到特定逻辑 channel 的工作。它提供了一个内置客户端,该客户端将消息发送到相同的WS连接,因此我们不必为此发明定制的实现。

    使用RSocket-RPC使其变得更好

    由于RSocket只是一个协议(protocol),它不提供任何消息格式,因此此挑战是针对业务逻辑的。但是,有一个RSocket-RPC项目,它提供 Protocol Buffer 作为消息格式,并重复使用与GRPC相同的代码生成技术。因此,使用RSocket-RPC,我们可以轻松地为客户端和服务器构建API,而根本不关心传输和协议(protocol)抽象。

    相同的RSocket Spring Boot集成还提供了RSocket-RPC用法的example

    好吧,它并没有说服我,我仍然想拥有一个自定义WebSocket服务器

    因此,为此目的,您必须自己实现。我之前已经做过一次,但是由于该项目属于企业级项目,因此我无法指出。
    不过,我可以共享一些代码示例,这些示例可以帮助您构建合适的客户端和服务器。

    服务器端

    处理程序和开放逻辑订户映射

    必须考虑的第一点是,一个物理连接内的所有逻辑流应存储在以下位置:
    class MyWebSocketRouter implements WebSocketHandler {
    
      final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;
    
    
      @Override
      public Mono<Void> handle(WebSocketSession session) {
        final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
        ...
      }
    }
    

    上面的示例中有两个 map 。第一个是路由映射,它使您可以根据传入的消息参数等来识别路由。第二个是为请求流用例创建的(在我的情况下是 Activity 订阅的映射),因此您可以发送创建订阅的消息帧,或者订阅您的特定操作并保留该订阅,这样一旦取消订阅操作已执行,如果有订阅,您将被取消订阅。

    使用处理器进行消息多路复用

    为了从所有逻辑流发送回消息,您必须将消息多路复用到一个流。例如,使用Reactor,可以使用UnicastProcessor做到这一点:
    @Override
    public Mono<Void> handle(WebSocketSession session) {
      final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
      ...
    
      return Mono
        .subscriberContext()
        .flatMap(context -> Flux.merge(
          session
            .receive()
            ...
            .cast(ActionMessage.class)
            .publishOn(Schedulers.parallel())
            .doOnNext(am -> {
              switch (am.type) {
                case CREATE:
                case UPDATE:
                case CANCEL: {
                  ...
                }
                case SUBSCRIBE: {
                  Flux<ResponseMessage<?>> flux = Flux
                    .from(
                      channelsMapping.get(am.getChannelId())
                                     .get(ActionMessage.Type.SUBSCRIBE)
                                     .handle(am) // returns Publisher<>
                    );
    
                  if (flux != null) {
                    channelsIdsToDisposableMap.compute(
                      am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
                      (cid, disposable) -> {
                        ...
    
                        return flux
                          .subscriberContext(context)
                          .subscribe(
                            funIn::onNext, // send message to a Processor manually
                            e -> {
                              funIn.onNext(
                                new ResponseMessage<>( // send errors as a messages to Processor here
                                  0,
                                  e.getMessage(),
                                  ...
                                  ResponseMessage.Type.ERROR
                                )
                              );
                            }
                          );
                      }
                    );
                  }
    
                  return;
                }
                case UNSABSCRIBE: {
                  Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());
    
                  if (disposable != null) {
                    disposable.dispose();
                  }
                }
              }
            })
            .then(Mono.empty()),
    
            funIn
                ...
                .map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
                .as(session::send)
          ).then()
        );
    }
    

    从上面的示例中可以看到,那里有很多东西:
  • 消息中应包含路线信息
  • 消息应包含与其相关的唯一流ID。
  • 用于消息多路复用的独立处理器,其中错误也应该是消息
  • 每个 channel 都应存储在某个地方,在这种情况下,我们都有一个简单的用例,其中每个消息都可以提供消息的Flux或仅提供Mono(如果是mono,则可以在服务器端更简单地实现,因此您不必不必保留唯一的流ID)。
  • 此示例不包括消息编码/解码,因此您将面临这个挑战。

  • 客户端

    客户端也不是那么简单:

    处理 session

    为了处理连接,我们必须分配两个处理器,以便进一步使用它们来复用和解复用消息:
    UnicastProcessor<> outgoing = ...
    UnicastPorcessor<> incoming = ...
    (session) -> {
      return Flux.merge(
         session.receive()
                .subscribeWith(incoming)
                .then(Mono.empty()),
         session.send(outgoing)
      ).then();
    }
    

    将所有逻辑流放在某处

    所有创建的流,无论是Mono还是Flux,都应存储在某个地方,以便我们能够区分与哪个流消息相关:
    Map<String, MonoSink> monoSinksMap = ...;
    Map<String, FluxSink> fluxSinksMap = ...;
    

    自从MonoSink以来,我们必须保留两个映射,而FluxSink没有相同的父接口(interface)。

    邮件路由

    在以上示例中,我们仅考虑了客户端的初始部分。现在我们必须建立一个消息路由机制:
    ...
    .subscribeWith(incoming)
    .doOnNext(message -> {
        if (monoSinkMap.containsKey(message.getStreamId())) {
            MonoSink sink = monoSinkMap.get(message.getStreamId());
            monoSinkMap.remove(message.getStreamId());
            if (message.getType() == SUCCESS) {
                sink.success(message.getData());
            }
            else {
                sink.error(message.getCause());
            }
        } else if (fluxSinkMap.containsKey(message.getStreamId())) {
            FluxSink sink = fluxSinkMap.get(message.getStreamId());
            if (message.getType() == NEXT) {
                sink.next(message.getData());
            }
            else if (message.getType() == COMPLETE) {
                fluxSinkMap.remove(message.getStreamId());
                sink.next(message.getData());
                sink.complete();
            }
            else {
                fluxSinkMap.remove(message.getStreamId());
                sink.error(message.getCause());
            }
        }
    })
    

    上面的代码示例显示了如何路由传入的消息。

    多重请求

    最后一部分是消息多路复用。为此,我们将介绍可能的发件人类impl:
    class Sender {
        UnicastProcessor<> outgoing = ...
        UnicastPorcessor<> incoming = ...
    
        Map<String, MonoSink> monoSinksMap = ...;
        Map<String, FluxSink> fluxSinksMap = ...;
    
        public Sender () {
    

    //在此处创建websocket连接,并放置前面提到的代码
    }
        Mono<R> sendForMono(T data) {
            //generate message with unique
            return Mono.<R>create(sink -> {
                monoSinksMap.put(streamId, sink);
                outgoing.onNext(message); // send message to server only when subscribed to Mono
            });
        }
    
         Flux<R> sendForFlux(T data) {
             return Flux.<R>create(sink -> {
                fluxSinksMap.put(streamId, sink);
                outgoing.onNext(message); // send message to server only when subscribed to Flux
            });
         }
    }
    

    自定义实现总结
  • 铁杆
  • 没有实现反压支持,所以这可能是另一个挑战
  • 容易射中自己的脚

  • 外卖
  • 请使用RSocket,不要自己发明协议(protocol),这很困难!
  • 要从Pivotal专家那里了解有关RSocket的更多信息-https://www.youtube.com/watch?v=WVnAbv65uCU
  • 要从我的一次演讲中了解有关RSocket的更多信息-https://www.youtube.com/watch?v=XKMyj6arY2A
  • 在RSocket之上构建了一个称为Proteus的特色框架-您可能对此感兴趣-https://www.netifi.com/
  • 要从RSocket协议(protocol)的核心开发人员那里了解有关Proteus的更多信息-https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E
  • 10-08 00:49