问题描述
我有一个发射一些 Date
的通量.这个 Date
被映射到 1024 个模拟的 HTTP 请求,我在一些 Executer
上运行.
I have a flux that emits some Date
. This Date
is mapped to 1024 simulated HTTP requests that I'm running on some Executer
.
我想做的是在发出下一个 Date
之前等待所有 1024 个 HTTP 请求.
What I'd like to do is waiting for all the 1024 HTTP requests before emitting the next Date
.
当前运行时,onNext()
被多次调用,然后稳定在某个稳定的速率上.
Currently when running, onNext()
is called for many times and then it is stabilised on some steady rate.
我该如何改变这种行为?
How can I change this behaviour?
附言如果需要,我愿意转向架构.
P.S. I'm willing to change to architecture, if needed.
private void run() throws Exception {
Executor executor = Executors.newFixedThreadPool(2);
Flux<Date> source = Flux.generate(emitter ->
emitter.next(new Date())
);
source
.log()
.limitRate(1)
.doOnNext(date -> System.out.println("on next: " + date))
.map(date -> Flux.range(0, 1024))
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)))
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
HTTP 请求模拟:
private static String simulateHttp() {
try {
System.out.println("start http call");
Thread.sleep(3_000);
} catch (Exception e) {}
return "HTML content";
}
改编自答案的代码:
- 首先,我的代码中有一个错误(需要另一个
flatMap
) 其次,我将
1
的concurrency
参数添加到两个flatMap
(似乎两者都需要)
- First, I had a bug in my code (another
flatMap
was needed) Second, I added
concurrency
parameter of1
to bothflatMap
(it seems that both are needed)
Executor executor = Executors.newSingleThreadExecutor();
Flux<Date> source = Flux.generate(emitter -> {
System.out.println("emitter called!");
emitter.next(new Date());
});
source
.limitRate(1)
.map(date -> Flux.range(0, 16))
.flatMap(Function.identity(), 1) # concurrency = 1
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
推荐答案
你应该看看这些方法:
concatMap
确保通量上的元素在运算符内部按顺序处理:
concatMap
ensures that the elements on the flux are processed sequentially inside the operator:
inners 和订阅的生成:这个操作符等待一个在生成下一个并订阅之前完成
flatMap
允许您通过公开 concurrency
和 prefetch
参数来执行相同的操作,这些参数可为您提供对此行为的更多控制:
flatMap
lets you do the same by exposing concurrency
and prefetch
parameters which provide you more control over this behavior:
并发参数允许控制可以有多少个发布者并行订阅和合并.反过来,该论证表明上游的第一个 Subscription.request(long) 的大小.这prefetch 参数允许为合并的发布者(换句话说,预取大小是指第一个 Subscription.request(long) 到合并的发布者.
这篇关于Project Reactor:如何控制通量发射的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!