好的,我是RSocket的新手。我正在尝试创建一个简单的RSocket客户端和简单的RSocket服务器。根据我所做的研究,它说RSocket支持恢复:

它特别有用,因为当发送包含有关上一个接收到的帧的信息的RESUME帧时,客户端能够恢复连接并仅请求尚未接收到的数据,从而避免了不必要的服务器负载并浪费了尝试检索已检索的数据。

它还说客户是负责恢复的负责人。我的问题是如何启用此恢复以及如何发送该RESUME帧。我具有功能完备的客户端和服务器,但是如果关闭服务器并再次启动它,则什么也没有发生,稍后当客户端再次尝试与服务器通信时,它会抛出:java.nio.channels.ClosedChannelException。

这是我的客户端配置:

@Configuration
public class ClientConfiguration {

/**
 * Defining the RSocket client to use tcp transport on port 7000
 */
@Bean
public RSocket rSocket() {
    return RSocketFactory
            .connect()
            .resumeSessionDuration(Duration.ofDays(10))
            .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create(7000))
            .start()
            .block();
}

/**
 * RSocketRequester bean which is a wrapper around RSocket
 * and it is used to communicate with the RSocket server
 */
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
    return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}

}

这是一个RestController,我将从中开始与rsocket服务器的通信:
@RestController
public class UserDataRestController {

private final RSocketRequester rSocketRequester;

public UserDataRestController(RSocketRequester.Builder rSocketRequester) {
    this.rSocketRequester = rSocketRequester.connectTcp("localhost", 7000).block();
}

@GetMapping(value = "/feed/{firstName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Person> feed(@PathVariable("firstName") String firstName) {
    return rSocketRequester
            .route("feedPersonData")
            .data(new PersonDataRequest(firstName))
            .retrieveFlux(Person.class);
}

}

最佳答案

由于会话存储在内存中,因此服务器重启后将无法继续。参见io.rsocket.resume.SessionManager#sessions

但是,如果重新连接到同一服务器,您仍然可以保护自己免受网络问题的影响。而且您不必发送RESUME帧,客户端可以为您完成。

您应该配置服务器:

@Bean
ServerRSocketFactoryProcessor serverRSocketFactoryProcessor() {
    return RSocketFactory.ServerRSocketFactory::resume;
}

和客户端io.rsocket.RSocketFactory.ClientRSocketFactory#resume

您可以找到几乎完整的示例here

09-27 18:23