我试图了解如何在Spring WebFlux中施加反压。我了解背压的理论,但是我无法重现它,因此我并不完全了解它。

让我们来看下面的例子:

public void test() throws InterruptedException {
    EmitterProcessor<String> processor = EmitterProcessor.create();

    new Thread(() -> {
        int i = 0;
        while(runThread) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException ignored) {
            }
            processor.onNext("Value: " + i);
            i++;
        }
        processor.onComplete();
    }).start();

    processor
            .subscribe(makeSubscriber("FIRST - "), Throwable::printStackTrace);
}

private Consumer<String> makeSubscriber(String label) {
    return v -> {
        System.out.println(label + v);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignored) {
        }
    };
}


我以EmitterProcessor的形式创建了一个热通量,并在一个单独的线程中开始为其生成数据。
低一点,我订阅它。订户比元素生产的速度慢,因此问题应该开始出现,对吗?
但是订户逻辑在生产者线程上运行。当我调用processor.onNext()时,它会同步调用所有订阅者,因此,如果订阅者很慢,发布者的速度也会变慢。因此,反压似乎根本没有用。

我还尝试制作了两个Spring Boot WebFlux应用程序,一个具有Flux端点,一个使用了该端点,因此可以确定使用者在单独的线程上运行。但是,然后,我对消费者施加反压的任何尝试都无济于事。没有缓冲区被填充,没有被丢弃或任何东西!

谁能给我一个具体的背压示例?最好在Spring WebFlux中使用,但我将使用任何反应式Java库。

最佳答案

您选择的subscribe方法的变体文档如下:

订阅将请求无限制的需求(Long.MAX_VALUE)。

也就是说,您自己关闭了背压。

要使用背压,请预订Flux.subscribe(Subscriber)

08-06 05:34