我将 Spring Webflux 与 Spring 数据 jpa 一起使用,使用 PostgreSql 作为后端数据库.我不想在进行诸如 find
和 save
之类的数据库调用时阻塞主线程.为了实现相同的目的,我在 Controller
类和 jdbcScheduler
I am using Spring Webflux with Spring data jpa using PostgreSql as backend db.I don't want to block the main thread while making db calls like find
and save
.To achieve the same, I have a main scheduler in Controller
class and a jdbcScheduler
service classes.
public class CommonConfig {
int connectionPoolSize;
public Scheduler scheduler() {
return Schedulers.parallel();
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
现在,在我的服务层中执行 get/save 调用时:
Now, while doing a get/save call in my service layer I do:
public Mono<Config> getConfigByKey(String key) {
return Mono.defer(
() -> Mono.justOrEmpty(configRepository.findByKey(key)))
public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
return Flux
public Flux<Config> addConfigs(List<Config> configList) {
return Flux.fromIterable(configRepository.saveAll(configList))
And in controller, I do:
Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
return configService.addConfigs(configs).collectList()
.map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
Is this correct? and/or there is a way better way to do it?
是该任务将在 jdbcScheduler
线程上运行,随后的结果将发布在我的主并行 scheduler
is that task will run on jdbcScheduler
threads and later result will be published on my main parallel scheduler
. Is this understanding correct?
您对 publishOn
和 subscribeOn
如果您调用阻塞库而不调度它在特定调度程序上工作,这些调用将阻塞少数可用线程之一(默认情况下,Netty 事件循环),并且您的应用程序将只能同时处理少数请求.
If you call blocking libraries without scheduling that work on a specific scheduler, those calls will block one of the few threads available (by default, the Netty event loop) and your application will only be able to serve a few requests concurrently.
Now I'm not sure what you're trying to achieve by doing that.
首先,并行调度器是为 CPU 密集型任务设计的,意思是您将拥有的数量很少,与 CPU 内核一样多(或更多).在这种情况下,这就像将线程池大小设置为常规 Servlet 容器上的核心数.您的应用将无法处理大量并发请求.
First, the parallel scheduler is designed for CPU bound tasks, meaning you'll have few of them, as many (or a bit more) as CPU cores. In this case, it's like setting your threadpool size to the number of cores on a regular Servlet container. Your app won't be able to process a large number of concurrent requests.
即使您选择了更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环,后者是 Spring WebFlux 中本地调度请求处理的地方.
Even if you choose a better alternative (like the elastic Scheduler), it will be still not as good as the Netty event loop, which is where request processing is scheduled natively in Spring WebFlux.
如果您的最终目标是性能和可扩展性,那么在反应式应用程序中包装阻塞调用的性能可能比您的常规 Servlet 容器差.
If your ultimate goal is performance and scalability, wrapping blocking calls in a reactive app is likely to perform worse than your regular Servlet container.
您可以改为使用 Spring MVC 和:
You could instead use Spring MVC and:
- 在处理阻塞库(如 JPA)时使用通常的阻塞返回类型
- 当您不依赖于此类库时,请使用
- use usual blocking return types when you're dealing with a blocking library, like JPA
- use
return types when you're not tied to such libraries
This won't be non-blocking, but this will be asynchronous still and you'll be able to do more work in parallel without dealing with the complexity.
