我试图了解如何在Spring WebFlux中施加反压。我了解背压的理论,但是我无法重现它,因此我并不完全了解它。
让我们来看下面的例子:
public void test() throws InterruptedException {
EmitterProcessor<String> processor = EmitterProcessor.create();
new Thread(() -> {
int i = 0;
while(runThread) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
processor.onNext("Value: " + i);
i++;
}
processor.onComplete();
}).start();
processor
.subscribe(makeSubscriber("FIRST - "), Throwable::printStackTrace);
}
private Consumer<String> makeSubscriber(String label) {
return v -> {
System.out.println(label + v);
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
};
}
我以EmitterProcessor的形式创建了一个热通量,并在一个单独的线程中开始为其生成数据。
低一点,我订阅它。订户比元素生产的速度慢,因此问题应该开始出现,对吗?
但是订户逻辑在生产者线程上运行。当我调用processor.onNext()时,它会同步调用所有订阅者,因此,如果订阅者很慢,发布者的速度也会变慢。因此,反压似乎根本没有用。
我还尝试制作了两个Spring Boot WebFlux应用程序,一个具有Flux端点,一个使用了该端点,因此可以确定使用者在单独的线程上运行。但是,然后,我对消费者施加反压的任何尝试都无济于事。没有缓冲区被填充,没有被丢弃或任何东西!
谁能给我一个具体的背压示例?最好在Spring WebFlux中使用,但我将使用任何反应式Java库。
最佳答案
您选择的subscribe
方法的变体文档如下:
订阅将请求无限制的需求(Long.MAX_VALUE)。
也就是说,您自己关闭了背压。
要使用背压,请预订Flux.subscribe(Subscriber)