有什么方法可以迫使groupBy()生成的磁通在一段时间后(或类似地,限制“开放”组的最大数量)完成,而不管上游的完整性如何?我有类似以下内容:
Flux<Foo> someFastPublisher;
someFastPublisher
.groupBy(f -> f.getKey())
.delayElements(Duration.ofSeconds(1)) // rate limit each group
.flatMap(g -> g) // unwind the group
.subscribe()
;
并且由于组的数量大于
flatMap
的并发性而遇到了Flux挂起的情况。我可以增加flatMap
并发性,但是没有一种简单的方法可以知道最大可能的大小是多少。相反,我知道按Foo
分组的Foo.key
在时间/发布顺序上将彼此接近,并且宁愿在groupB上使用某种时间窗口通过Flux vs.flatMap并发(并且最终有两种不同带有相同key()
的组没什么大不了的)。我猜测
groupBy
onCompletes之前someFastPubisher
Flux不会onComplete-即Flux传递给flatMap
只是保持“打开”状态(尽管它们不太可能会发生新事件)。我可以通过预取groupBy中的
Integer.MAX
或并发Integer.MAX
来解决此问题,但是有没有办法控制该组的“生命”? 最佳答案
是:您可以将take(Duration)
应用于组,以确保它们早日关闭,并在之后打开具有相同 key 的新组:
source.groupBy(v -> v.intValue() % 2)
.flatMap(group -> group
.take(Duration.ofMillis(1000))
.count()
.map(c -> "group " + group.key() + " size = " + c)
)
.log()
.blockLast();
关于project-reactor - Reactor 3.x-限制组的时间,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47893575/