有人可以解释takeUntilOther()
方法如何工作吗?我尝试运行以下代码,但在控制台上什么也没显示。
Mono.just(10)
.subscribe();
Flux.range(1, 5)
.takeUntilOther(Mono.just(10))
.subscribe(System.out::println);
我不明白为什么。
最佳答案
基里尔
我建议您参考the project reactor's documentation的适当部分。
takeUntilOther(Publisher<?> other)
中继此磁通量中的值,直到发出给定的发布者为止。
意思是,您将接收来自原始Flux的值,直到给定的Publisher<?> other
开始产生事件为止。在您的情况下,您有一个hot publisher just()
可以立即中断原始Flux(通过调用cancel()
方法)。
我再举一个例子。请看以下代码片段:
Flux.range(1, 5) // produces elements from 1 to 5
.delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
.takeUntilOther(Mono
.just(10) // hot publisher. emits one element
// delays '10' for 3 seconds. meaning that it will only
// appears in the original Flux in 3 seconds
.delayElement(Duration.ofSeconds(3))
)
.subscribe(System.out::print);
输出为:
12