我有一个Spring Boot Web应用程序,该应用程序具有更新名为StudioLinking的实体的功能。此实体描述了两个IoT设备之间的临时,可变,描述性逻辑链接,我的Web应用程序是它们的云服务。这些设备之间的链接本质上是短暂的,但是StudioLinking实体保留在数据库中以用于报告。使用Spring Data / Hibernate以传统方式将StudioLinking存储到基于SQL的数据存储中。该StudioLinking实体会不时地使用来自Rest API的新信息进行更新。链接更新后,设备需要响应(更改颜色,音量等)。现在,通过每5秒轮询一次来进行处理,但这会造成人类用户向系统中输入更新以及IoT设备实际更新时的延迟。可能只有一毫秒或最多5秒!显然,增加轮询的频率是不可持续的,并且绝大多数时间根本没有更新!
因此,我正在尝试使用HTTP Long Polling在同一应用程序上开发另一个Rest API,该API将在给定StudioLinking实体更新或超时后返回。侦听器不支持WebSocket或类似功能,使我无法进行Long Polling。长轮询可能会导致竞争状态,在这种情况下,您必须考虑以下可能性:对于连续消息,一条消息可能会在HTTP请求之间进入时“丢失”(在关闭和打开连接时,可能会出现新的“更新”如果使用发布/订阅,则不会被“注意到”)。
重要的是要注意,此“订阅更新” API应该只返回StudioLinking的LATEST和CURRENT版本,但仅当存在实际更新或自上次 checkin 后发生更新时才这样做。 “订阅更新”客户端将首先发布API请求,以设置新的侦听 session 并将其传递,以便服务器知道它们是谁。因为可能有多个设备需要监视对同一个StudioLinking实体的更新。我相信我可以通过在Redis XREAD中使用单独命名的使用者来完成此操作。 (请牢记这一点,以供稍后在问题中使用)
经过数小时的研究,我相信完成此操作的方法是使用redis流。
我在Spring Data Redis中找到了有关Redis流的以下两个链接:
https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/
https://medium.com/@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281
我还阅读了有关长轮询的链接,这两个链接在长轮询期间都只有一个睡眠计时器,用于演示,但显然我想做一些有用的事情。
https://www.baeldung.com/spring-deferred-result
这两个链接都非常有帮助。现在,我很清楚如何将更新发布到Redis Stream上-(这是未经测试的“伪代码”,但我预计实现此目的不会有任何问题)

// In my StudioLinking Entity

@PostUpdate
public void postToRedis() {
    StudioLinking link = this;
    ObjectRecord<String, StudioLinking> record = StreamRecords.newRecord()
            .ofObject(link)
            .withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
    this.redisTemplate
            .opsForStream()
            .add(record)
            .subscribe(System.out::println);
    atomicInteger.incrementAndGet();
}
但是当涉及到上述流时,我会保持一致:所以基本上我想在这里做-请原谅伪造的伪代码,这仅出于想法目的。我很清楚,代码绝不表示语言和框架的实际行为:)
// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
    LOG.info("Received async-deferredresult request");
    DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);

    deferredResult.onTimeout(() ->
      deferredResult.setErrorResult(
        ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
          .body("IT WAS NOT UPDATED!")));

    ForkJoinPool.commonPool().submit(() -> {
        //----------------------------------------------
        // Made up stuff... here is where I want to subscribe to a stream and block!
        //----------------------------------------------
        LOG.info("Processing in separate thread");
        try {
            // Subscribe to Redis Stream, get any updates that happened between long-polls
            // then block until/if a new message comes over the stream
            var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(studioLinkingID, updateList),
                StreamOffset.create(studioLinkingID, ReadOffset.lastConsumed()),
                streamListener);
            listenerContainer.start();
        } catch (InterruptedException e) {
        }
        output.setResult("IT WAS UPDATED!");
    });

    LOG.info("servlet thread freed");
    return output;
}
因此,对于我将如何解决这个问题,是否有很好的解释?我认为答案就在https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html之内,但是我对Spring的熟练程度还不足以真正理解Java Docs中的术语(Spring文档确实很好,但是JavaDocs是用密集的技术语言编写的,对此我表示赞赏,但我不愿意相当了解)。
我的实现还有两个障碍:
  • 我对spring的确切了解还不是100%。我还没有到达那一刻,我真的完全理解为什么所有这些bean都在漂浮。我认为这是为什么我不能在这里得到东西的关键... Redis的配置在Spring以太坊中 float ,我不了解如何称呼它。我真的需要继续对此进行调查(这对我来说是一个巨大的障碍)。
  • 这些StudioLinking是短暂的,因此我也需要进行一些清理。一旦我将整个事情付诸实践,我将在稍后实现,我确实知道将需要它。
  • 最佳答案

    为什么不使用阻塞式轮询机制?无需使用spring-data-redis的高级工具。只需使用5秒钟的简单阻塞读取,所以此调用可能需要6秒钟左右的时间。您可以减少或增加阻止超时。

    class LinkStatus {
        private final boolean updated;
    
        LinkStatus(boolean updated) {
          this.updated = updated;
        }
      }
    
    
    
    // Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
      // updateList is a unique token to track individual consumers in Redis
      @GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
      public LinkStatus subscribeToUpdates(
          @PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
        StreamOperations<String, String, String> op = redisTemplate.opsForStream();
    
        Consumer consumer = Consumer.from("test-group", "test-consumer");
        // auto ack block stream read with size 1 with timeout of 5 seconds
        StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
        List<MapRecord<String, String, String>> records =
            op.read(consumer, readOptions, StreamOffset.latest("test-stream"));
        return new LinkStatus(!CollectionUtils.isEmpty(records));
      }
    

    10-05 21:19
    查看更多