解决方案摘要:
在当前的大多数RSocket示例中,即使在SpringBoot相关的教程中,服务器端接受器也被简单地构造为新对象(例如下面的新MqttMessageService())。如果您在acceptor类中生成示例内容,这很好,但是当acceptor依赖于容器中的其他bean时,可能导致以下与依赖项注入相关的混乱。
原始问题:
当尝试通过Rsocket的Java服务器使用Spring Data Reactive Mongodb存储库流式传输数据库条目时,出现NullPointerException。
问题是,在调试期间,所有组件都单独工作:我可以通过相同的Mongodb存储库获取请求的数据,也可以使用Rsocket在同一服务器和客户端之间流式传输随机生成的数据。
因此,我要么缺少一些真正的基础知识,要么将Reactive Mongodb和Rsocket一起使用可能会出现问题。
这是原始的服务器端Rsocket配置:
@Configuration
public class RsocketConfig {
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
这是带有正确DI的工作服务器端Rsocket配置:
@Configuration
public class RsocketConfig {
@Autowired
MqttMessageService messageService;
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(messageService))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
这是服务器端AbstractRSocket实现,其中在return service.findAll()处引发NullPointerException。
@Service
public class MqttMessageService extends AbstractRSocket {
@Autowired
private MqttMessageEntityService service;
@Override
public Flux<Payload> requestStream(Payload payload) {
return service.findAll()
.map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));
}
}
这是反应式存储库和相关服务。当注入到服务器的AbstractRSocket实现中时,该服务返回null,但是注入到其他类中时,该函数可以正常工作:
@Service
public class MqttMessageEntityService {
@Autowired
private MqttMessageEntityRepository repository;
public Flux<MqttMessageEntity> findAll() {
return repository.findAll();
}
}
public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {
}
这是与测试内容完美配合的客户端代码:
@Configuration
public class RsocketConfig {
@PostConstruct
public void testRsocket() {
RSocket rSocketClient = RSocketFactory
.connect()
.transport(TcpClientTransport.create(8802))
.start()
.block();
rSocketClient
.requestStream(DefaultPayload.create(""))
.blockLast();
}
}
我在这里的知识水平可能有点高,并且该主题的资源非常有限,因此,我很感谢提出解决方案的任何提示:)
最佳答案
关于
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose();
}
您是否正在使用使服务器保持 Activity 状态?如果是这样,则在onClose()之后添加另一个块。
messageEntityService是否为null?因为这看起来像是唯一的变量,如果不是topicStart和module,可能会导致错误。尤其是如果其他代码有效-从RSocket端我看不到任何会引起问题的东西。