Webflux响应式编程利器

先来一张图,这是spring文档的一张截图,介绍了spring如今的两种开发模式,MVC和webflux两种开发模式,可见webflux的重要性

(4)实战开发——Webflux响应式编程利器-LMLPHP

1. 初识SpringWebFlux

webflux 是spring5推出的一种响应式Web框架,它是一种非阻塞的开发模式,可以在一个线程里处理多个请求(非阻塞),运行在netty环境,也可以可以运行在servlet3.1之后的容器,支持异步servlet, 可以支持更高的并发量

2. 异步servlet

  • 我们知道同步servlet阻塞了Tomcat容器的线程,当一个网络请求到我们的Tomcat容器之后,容器会给每个请求启动一个线程去处理,线程里面会调用一个servlet去处理,当使用同步servlet时,业务代码花多长时间,你的线程就要等待多长时间,这就是堵塞(同步和异步是服务器后台才有异步这个概念,对于浏览器来说所有的请求都是异步,前台都要花费业务逻辑时间)
  • 异步servlet的主要作用是它不会堵塞Tomcat容器的servlet线程,它可以把一些耗时的操作放在一个独立的线程池,那么我们的servlet就可以立马返回,处理下一个请求,以此就可以达到高并发。
    通过代码比较一下同步servlet与异步servlet

同步servlet

@WebServlet(urlPatterns = "/SyncServlet")
public class SyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public SyncServlet() {
        super();
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();
        // 执行业务代码
        doSomeThing(request, response);
        System.out.println("sync use:" + (System.currentTimeMillis() - t1));
    }
    private void doSomeThing(HttpServletRequest request,
                             HttpServletResponse response) throws IOException {
        // 模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        response.getWriter().append("done");
    }
}

异步servlet

@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public AsyncServlet() {
        super();
    }

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 开启异步
        AsyncContext asyncContext = request.startAsync();

        // 执行业务代码,放入一个线程池里
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
                asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }
    private void doSomeThing(AsyncContext asyncContext,
                             ServletRequest servletRequest, ServletResponse servletResponse) {

        // 模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            servletResponse.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 业务代码处理完毕, 通知结束
        asyncContext.complete();
    }
}

(4)实战开发——Webflux响应式编程利器-LMLPHP

  • 通过以上两段代码控制台的打印结果可以看出,异步servlet把耗时操作放在一个独立的线程池,那么我们的servlet就可以立马返回,处理下一个请求。

3. CRUD完整示例

  • 通过下图可以看出MVC和wenflux的区别
    (4)实战开发——Webflux响应式编程利器-LMLPHP

  • 以下通过一个例子了解一下webflux开发
  1. 实体类
@Document(collection = "user")
@Data
public class User {

    @Id
    private String id;

    @NotBlank
    private String name;

    @Range(min=10, max=100)
    private int age;

}
  1. Controller层
@RestController
@RequestMapping("/user")
public class UserController {

    private final UserRepository repository;

    public UserController(UserRepository repository) {
        this.repository = repository;
    }

    /**
     * 以数组形式一次性返回数据
     */
    @GetMapping("/")
    public Flux<User> getAll() {
        return repository.findAll();
    }

    /**
     * 以SSE形式多次返回数据
     */
    @GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamGetAll() {
        return repository.findAll();
    }

    /**
     * 新增数据
     */
    @PostMapping("/")
    public Mono<User> createUser(@Valid @RequestBody User user) {
        // spring data jpa 里面, 新增和修改都是save. 有id是修改, id为空是新增
        // 根据实际情况是否置空id
        user.setId(null);
        CheckUtil.checkName(user.getName());
        return this.repository.save(user);
    }

    /**
     * 根据id删除用户 存在的时候返回200, 不存在返回404
     */
    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<Void>> deleteUser(
            @PathVariable("id") String id) {
        // deletebyID 没有返回值, 不能判断数据是否存在
        // this.repository.deleteById(id)
        return this.repository.findById(id)
                // 当你要操作数据, 并返回一个Mono 这个时候使用flatMap
                // 如果不操作数据, 只是转换数据, 使用map
                .flatMap(user -> this.repository.delete(user).then(
                        Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 修改数据 存在的时候返回200 和修改后的数据, 不存在的时候返回404
     */
    @PutMapping("/{id}")
    public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
            @Valid @RequestBody User user) {
        CheckUtil.checkName(user.getName());
        return this.repository.findById(id)
                // flatMap 操作数据
                .flatMap(u -> {
                    u.setAge(user.getAge());
                    u.setName(user.getName());
                    return this.repository.save(u);
                })
                // map: 转换数据
                .map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 根据ID查找用户 存在返回用户信息, 不存在返回404
     */
    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> findUserById(
            @PathVariable("id") String id) {
        return this.repository.findById(id)
                .map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 根据年龄查找用户
     */
    @GetMapping("/age/{start}/{end}")
    public Flux<User> findByAge(@PathVariable("start") int start,
            @PathVariable("end") int end) {
        return this.repository.findByAgeBetween(start, end);
    }

    /**
     * 根据年龄查找用户
     */
    @GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamFindByAge(@PathVariable("start") int start,
            @PathVariable("end") int end) {
        return this.repository.findByAgeBetween(start, end);
    }

    /**
     *  得到20-30用户
     */
    @GetMapping("/old")

    public Flux<User> oldUser() {
        return this.repository.oldUser();
    }

    /**
     * 得到20-30用户
     */
    @GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamOldUser() {
        return this.repository.oldUser();
    }

}
  1. Repository层
@Repository
public interface UserRepository extends ReactiveMongoRepository<User, String> {

    /**
     * 根据年龄查找用户
     */
    Flux<User> findByAgeBetween(int start, int end);

    @Query("{'age':{ '$gte': 20, '$lte' : 30}}")
    Flux<User> oldUser();
}
  • 以上代码没有进行校验,当然没有校验的代码是不能用的,校验代码我就不放了,想了解的GitHub上有完整代码。
01-22 23:53