我正在尝试仅使用Spring Data从反应性Mongodb存储库中获取新创建的消息。
客户端正在通过SSE获取消息。我正在使用“之后”查询,该查询仅应返回在“ LocalDateTime.now()”之后发送的消息。
不幸的是,SSE也推出了比“ now”还早的旧消息。我不知道为什么它会返回那些旧消息。
我的控制器方法:
@GetMapping(value = "/receiving-sse", produces = "text/event-stream")
public Flux<Message> streamEvents() {
Mono<String> username = getUsernameFromAuth();
Flux<Message> message = findOrCreateUser(username)
.flatMapMany(user -> messageRepository
.findAllBySenderIdOrReceiverIdAndSentAtAfter(user.getId(), user.getId(), LocalDateTime.now()));
Flux<Message> heartBeat = Flux.interval(Duration.ofSeconds(30)).map(sequence -> {
Message heartBeatMessage = new Message();
heartBeatMessage.setHeartbeat(true);
return heartBeatMessage;
});
return Flux.merge(message, heartBeat);
}
我的资料库:
public interface MessageRepository extends ReactiveMongoRepository<Message, String> {
Flux<Message> findAllByReceiverId(String receiverId);
@Tailable
Flux<Message> findAllBySenderIdOrReceiverIdAndSentAtAfter(String senderId, String receiverId, LocalDateTime sentAt);
Flux<Message> findAllBySenderId(String senderId);
Flux<Message> findAllByIdIn(Collection<String> ids);
}
而我的文件:
@Data
@Document
public class Message {
private String id;
private LocalDateTime sentAt;
private String message;
private boolean heartbeat;
@DBRef
private User sender;
@DBRef
private User receiver;
}
十分感谢有关回购为何要提取具有比“ LocalDateTime.now()”更旧的“ sentAt”的消息的任何提示。
最佳答案
问题出在LocalDateTime.now()
中。
LocalDateTime在Flux<Message> message
变量初始化期间创建一个时间。
您需要将其替换为Mono.defer()
之类的结构。它将在每个订阅中创建LocalDateTime。
更多内容,您可以阅读here
关于java - 响应式Spring Data Mongodb查询在不应该返回旧数据的情况下,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/57363983/