有什么方法可以迫使groupBy()生成的磁通在一段时间后(或类似地,限制“开放”组的最大数量)完成,而不管上游的完整性如何?我有类似以下内容:

Flux<Foo> someFastPublisher;

someFastPublisher
  .groupBy(f -> f.getKey())
  .delayElements(Duration.ofSeconds(1)) // rate limit each group
  .flatMap(g -> g) // unwind the group
  .subscribe()
;

并且由于组的数量大于flatMap的并发性而遇到了Flux挂起的情况。我可以增加flatMap并发性,但是没有一种简单的方法可以知道最大可能的大小是多少。相反,我知道按Foo分组的Foo.key在时间/发布顺序上将彼此接近,并且宁愿在groupB上使用某种时间窗口通过Flux vs.flatMap并发(并且最终有两种不同带有相同key()的组没什么大不了的)。

我猜测groupBy onCompletes之前someFastPubisher Flux不会onComplete-即Flux传递给flatMap只是保持“打开”状态(尽管它们不太可能会发生新事件)。

我可以通过预取groupBy中的Integer.MAX或并发Integer.MAX来解决此问题,但是有没有办法控制该组的“生命”?

最佳答案

是:您可以将take(Duration)应用于组,以确保它们早日关闭,并在之后打开具有相同 key 的新组:

source.groupBy(v -> v.intValue() % 2)
      .flatMap(group -> group
              .take(Duration.ofMillis(1000))
              .count()
              .map(c -> "group " + group.key() + " size = " + c)
      )
      .log()
      .blockLast();

关于project-reactor - Reactor 3.x-限制组的时间,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47893575/

10-12 19:55