Webflux响应式编程利器
先来一张图,这是spring文档的一张截图,介绍了spring如今的两种开发模式,MVC和webflux两种开发模式,可见webflux的重要性
1. 初识SpringWebFlux
webflux 是spring5推出的一种响应式Web框架,它是一种非阻塞的开发模式,可以在一个线程里处理多个请求(非阻塞),运行在netty环境,也可以可以运行在servlet3.1之后的容器,支持异步servlet, 可以支持更高的并发量
2. 异步servlet
- 我们知道同步servlet阻塞了Tomcat容器的线程,当一个网络请求到我们的Tomcat容器之后,容器会给每个请求启动一个线程去处理,线程里面会调用一个servlet去处理,当使用同步servlet时,业务代码花多长时间,你的线程就要等待多长时间,这就是堵塞(同步和异步是服务器后台才有异步这个概念,对于浏览器来说所有的请求都是异步,前台都要花费业务逻辑时间)
- 异步servlet的主要作用是它不会堵塞Tomcat容器的servlet线程,它可以把一些耗时的操作放在一个独立的线程池,那么我们的servlet就可以立马返回,处理下一个请求,以此就可以达到高并发。
通过代码比较一下同步servlet与异步servlet
同步servlet
@WebServlet(urlPatterns = "/SyncServlet")
public class SyncServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
public SyncServlet() {
super();
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
long t1 = System.currentTimeMillis();
// 执行业务代码
doSomeThing(request, response);
System.out.println("sync use:" + (System.currentTimeMillis() - t1));
}
private void doSomeThing(HttpServletRequest request,
HttpServletResponse response) throws IOException {
// 模拟耗时操作
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
response.getWriter().append("done");
}
}
异步servlet
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
public AsyncServlet() {
super();
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
long t1 = System.currentTimeMillis();
// 开启异步
AsyncContext asyncContext = request.startAsync();
// 执行业务代码,放入一个线程池里
CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
asyncContext.getRequest(), asyncContext.getResponse()));
System.out.println("async use:" + (System.currentTimeMillis() - t1));
}
private void doSomeThing(AsyncContext asyncContext,
ServletRequest servletRequest, ServletResponse servletResponse) {
// 模拟耗时操作
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
servletResponse.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}
// 业务代码处理完毕, 通知结束
asyncContext.complete();
}
}
- 通过以上两段代码控制台的打印结果可以看出,异步servlet把耗时操作放在一个独立的线程池,那么我们的servlet就可以立马返回,处理下一个请求。
3. CRUD完整示例
通过下图可以看出MVC和wenflux的区别
- 以下通过一个例子了解一下webflux开发
- 实体类
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
@NotBlank
private String name;
@Range(min=10, max=100)
private int age;
}
- Controller层
@RestController
@RequestMapping("/user")
public class UserController {
private final UserRepository repository;
public UserController(UserRepository repository) {
this.repository = repository;
}
/**
* 以数组形式一次性返回数据
*/
@GetMapping("/")
public Flux<User> getAll() {
return repository.findAll();
}
/**
* 以SSE形式多次返回数据
*/
@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamGetAll() {
return repository.findAll();
}
/**
* 新增数据
*/
@PostMapping("/")
public Mono<User> createUser(@Valid @RequestBody User user) {
// spring data jpa 里面, 新增和修改都是save. 有id是修改, id为空是新增
// 根据实际情况是否置空id
user.setId(null);
CheckUtil.checkName(user.getName());
return this.repository.save(user);
}
/**
* 根据id删除用户 存在的时候返回200, 不存在返回404
*/
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(
@PathVariable("id") String id) {
// deletebyID 没有返回值, 不能判断数据是否存在
// this.repository.deleteById(id)
return this.repository.findById(id)
// 当你要操作数据, 并返回一个Mono 这个时候使用flatMap
// 如果不操作数据, 只是转换数据, 使用map
.flatMap(user -> this.repository.delete(user).then(
Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 修改数据 存在的时候返回200 和修改后的数据, 不存在的时候返回404
*/
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
@Valid @RequestBody User user) {
CheckUtil.checkName(user.getName());
return this.repository.findById(id)
// flatMap 操作数据
.flatMap(u -> {
u.setAge(user.getAge());
u.setName(user.getName());
return this.repository.save(u);
})
// map: 转换数据
.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 根据ID查找用户 存在返回用户信息, 不存在返回404
*/
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> findUserById(
@PathVariable("id") String id) {
return this.repository.findById(id)
.map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 根据年龄查找用户
*/
@GetMapping("/age/{start}/{end}")
public Flux<User> findByAge(@PathVariable("start") int start,
@PathVariable("end") int end) {
return this.repository.findByAgeBetween(start, end);
}
/**
* 根据年龄查找用户
*/
@GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamFindByAge(@PathVariable("start") int start,
@PathVariable("end") int end) {
return this.repository.findByAgeBetween(start, end);
}
/**
* 得到20-30用户
*/
@GetMapping("/old")
public Flux<User> oldUser() {
return this.repository.oldUser();
}
/**
* 得到20-30用户
*/
@GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamOldUser() {
return this.repository.oldUser();
}
}
- Repository层
@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {
/**
* 根据年龄查找用户
*/
Flux<User> findByAgeBetween(int start, int end);
@Query("{'age':{ '$gte': 20, '$lte' : 30}}")
Flux<User> oldUser();
}
- 以上代码没有进行校验,当然没有校验的代码是不能用的,校验代码我就不放了,想了解的GitHub上有完整代码。