本文介绍了Project Reactor:如何控制通量发射的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个发射一些 Date 的通量.这个 Date 被映射到 1024 个模拟的 HTTP 请求,我在一些 Executer 上运行.

I have a flux that emits some Date. This Date is mapped to 1024 simulated HTTP requests that I'm running on some Executer.

我想做的是在发出下一个 Date 之前等待所有 1024 个 HTTP 请求.

What I'd like to do is waiting for all the 1024 HTTP requests before emitting the next Date.

当前运行时,onNext() 被多次调用,然后稳定在某个稳定的速率上.

Currently when running, onNext() is called for many times and then it is stabilised on some steady rate.

我该如何改变这种行为?

How can I change this behaviour?

附言如果需要,我愿意转向架构.

P.S. I'm willing to change to architecture, if needed.

private void run() throws Exception {

    Executor executor = Executors.newFixedThreadPool(2);

    Flux<Date> source = Flux.generate(emitter ->
        emitter.next(new Date())
    );

    source
            .log()
            .limitRate(1)
            .doOnNext(date -> System.out.println("on next: " + date))
            .map(date -> Flux.range(0, 1024))
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)))
            .subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}

HTTP 请求模拟:

private static String simulateHttp() {
    try {
        System.out.println("start http call");
        Thread.sleep(3_000);
    } catch (Exception e) {}

    return "HTML content";
}

改编自答案的代码:

  • 首先,我的代码中有一个错误(需要另一个 flatMap)
  • 其次,我将1concurrency参数添加到两个flatMap(似乎两者都需要)

  • First, I had a bug in my code (another flatMap was needed)
  • Second, I added concurrency parameter of 1 to both flatMap (it seems that both are needed)

Executor executor = Executors.newSingleThreadExecutor();

Flux<Date> source = Flux.generate(emitter -> {
    System.out.println("emitter called!");
    emitter.next(new Date());
});

source
        .limitRate(1)
        .map(date -> Flux.range(0, 16))
        .flatMap(Function.identity(), 1) # concurrency = 1
        .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                .subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
        .subscribe(s -> System.out.println(s));

Thread.currentThread().join();

推荐答案

你应该看看这些方法:

concatMap 确保通量上的元素在运算符内部按顺序处理:

concatMap ensures that the elements on the flux are processed sequentially inside the operator:

inners 和订阅的生成:这个操作符等待一个在生成下一个并订阅之前完成

flatMap 允许您通过公开 concurrencyprefetch 参数来执行相同的操作,这些参数可为您提供对此行为的更多控制:

flatMap lets you do the same by exposing concurrency and prefetch parameters which provide you more control over this behavior:

并发参数允许控制可以有多少个发布者并行订阅和合并.反过来,该论证表明上游的第一个 Subscription.request(long) 的大小.这prefetch 参数允许为合并的发布者(换句话说,预取大小是指第一个 Subscription.request(long) 到合并的发布者.

这篇关于Project Reactor:如何控制通量发射的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 06:09