Spring Reactor

Reactor 对比

1.1 Reactor 线程模型

  • 多线程
  • 2.1 回调

    回调其实就是把当前的事情完成之后,后面需要做的事当成函数传进行,等完成之后调用就行

    **public static void **main( String[] args ){
    _       doA_( ( next ) -> {
              _log_.info( **"doB" **);
              next.run();
           }, () -> _log_.info( **"doC" **) );
    
        }
        **public static void **doA( Consumer<Runnable> next, Runnable nextNext ){
    
           _log_.info( **"doA" **);
           next.accept( nextNext );
        }
        
    // output
    15:06:52.818 [main] INFO concurrent.CompleteTest - doA
    15:06:52.820 [main] INFO concurrent.CompleteTest - doB
    15:06:52.820 [main] INFO concurrent.CompleteTest - doC
    

    回调是在一个线程中来完成的,很容易理解,但问题是回调太多代码就变的很复杂,有回调地域的问题

    回调只是一种异步的编程方式,本身实现异步其实还是需要多线程,例如单独起一个监听线程来执行回调函数,例如 EventListener

    如果执行的任务不考虑线程安全问题的话,可以使用 CompletableFuture 来解决,会更加易于阅读

    CompletableFuture
           ._runAsync_( ()-> _log_.info(**"doA"**) )
           .thenRunAsync( ()-> _log_.info(**"doB"**) )
           .thenRunAsync( ()->_log_.info(**"doC"**) )
           .get();
    
    // output
    15:08:04.407 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doA
    15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doB
    15:08:04.410 [ForkJoinPool.commonPool-worker-1] INFO concurrent.CompleteTest - doC
    
    Mono._just_(**""**)
           .doOnNext( (x)-> _log_.info(**"doA"**) )
           .doOnNext( (x)-> _log_.info(**"doB"**) )
           .doOnNext( (x)-> _log_.info(**"doC"**) )
           .block();
    15:12:56.160 [main] INFO concurrent.CompleteTest - doA
    15:12:56.160 [main] INFO concurrent.CompleteTest - doB
    15:12:56.161 [main] INFO concurrent.CompleteTest - doC
    

    2.2 多线程

    多线程的方式,大家应该都很熟悉

    1. Thread
    2. ExecutorService 线程池
    3. CompletionService 带结果队列的线程池
    4. CompletableFuture 用于任务编排
    5. Runable、Callable、Future、CompletableFuture

    Spring Reactor

    从上面可以看到一些使用 Reactor 的代码中,都可以在原生 JDK 中找到替换,那我们为什么还需要它呢?

    • 可组合和可读性
    • 丰富的操作
    • 订阅之前什么都不会发生
    • 背压

    下面是 Java9 中 Flow 类的类图,SpringReactor 也是使用这四个类,在 Java9 中已经成了规范

    Spring Reactor基本介绍和案例-LMLPHP

    3.1 Publisher

    • Mono,提供 0 到 1 个 Item

    Spring Reactor基本介绍和案例-LMLPHP

    • Flux,提供 0 到 N 个 Item

    Spring Reactor基本介绍和案例-LMLPHP

    发布者提供 n 个 Item, 经过一些 operator(数据处理操作),完成或者异常中止

    核心方法:

    • subscribe

    3.1.1 创建

    Mono<String> noData = Mono.empty(); 
    Mono<String> data = Mono.just("foo");
    Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); 
    
    Mono._fromSupplier_( ()->**1 **);
    Mono._fromFuture_( CompletableFuture._runAsync_( ()-> {} ) );
    Flux._create_((sink)->{
        **for**( **int **i = **0**; i < **5**; i++ ){
           sink.next( i ) ;
        }
        sink.complete();
    });
    

    3.1.2 处理

    下面这些都称为 operator,可以很灵活处理其中的 Item

    • 转化 map、flatMap、
    • 消费 doOnNext、doNextError、doOnCancel
    • 过滤 filter、distinct、take
    • 错误处理 onErrorReturn、onErrorComplete、onErrorResume、doFinally
    • 时间相关 timeout、interval、delay
    • 分隔 window、buffer
    • 转同步 block、toStream

    3.1.3 订阅

    订阅然后消费发布者的内容

    subscribe(); 
    subscribe(Consumer<? super T> consumer);
    

    订阅之后的返回值是Disposable****,可以使用这个对象来取消订阅,会告诉发布者停止生产对象,但不保证会立即终止

    • 当然可以给 subscribe 传递参数,自定义 complete 或者 error 时需要做的时
    • 同时可以使用 BaseSubscriber 类来实现订阅,可以控制消费的数量

    3.2 Subscriber

    消费者一般不用手动创建,通过 subscribe 传进 Consumer 函数后,会自动生成一个 LambdaSubscriber,核心方法:

    • onSubscribe
    • onNext
    • onError
    • onComplete

    3.3 Processor

    既是发布者,又是订阅者

    3.4 Subscription

    订阅,消费者调用 subscribe 方法之后可以在 onSubscribe 回调中获取,可以请求下一个 Item 或者取消订阅

    • request
    • cancel

    3.5 Thread 和 Scheduler

    没有指定的情况下:

    • 当前的 operator 使用上一个 operator 的线程,最先的 operator 使用调用 subscribe 的线程来执行

    Reactor 中使用 Scheduler 来执行流程,类似 ExecutorService

    • subscribeOn 可以指定订阅时使用的线程,这样可以不阻塞的订阅
    • publishOn 指定发布时使用的线程

    Spring Reactor 优化案例

    Spring Reactor基本介绍和案例-LMLPHP

    流程中可以优化的点:

    1. 准备数据可以异步,等需要用的时候在去阻塞获取,相当于一个 Future
    2. 召回可以完成之后就去等正排数据,新的问题,如何去重?本来拿一次正排数据,现在拿 N 个召回次数据,请求量是不是会变大,耗时是不是也会增加
    3. 过滤的准备数据也可以异步,也就是说某个过滤策略的数据准备好了,就可以去执行过滤了,而且还存在很多不需要依赖数据的过滤策略也需要等
    4. 一般粗排只需要 1000 条数据,过滤时已经拿够了 1000 条就可以跳过了

    我们上面所说的异步,其实就是说流程中某些节点是在同时执行的,不必等一个节点完成后再执行另外一个,这其实一个统筹学的问题

    Spring Reactor基本介绍和案例-LMLPHP

    4.1 解决方法对比

    **for **(StrategyConfig filterConfig : filterConfigList) {
        _doStrategyFilter_(filterChainContext, recommendContext, recRequest, filterConfig, allFilters, partitionContext, partitionTrace);
    }
    
    
    readyStrategyFlux.publishOn(ExecutorServiceHolder._scheduler_).doOnNext((readyStrategyName) -> {
        **try **{
            List<StrategyConfig> strategyConfigs = strategyNameToConfigs.get(readyStrategyName);
            **for **(StrategyConfig strategyConfig : strategyConfigs) {
                _doStrategyFilter_(filterChainContext, recommendContext, recRequest, strategyConfig, allFilters, partitionContext, partitionTrace);
            }
        } **catch **(Exception e) {
            _LOGGER_.error(**"doOnNext filter error"**, e);
        }
    
    }).blockLast();
    

    这里的 blockLast 又回到了同步世界,可以很好的和已有的代码兼容

    下面是 20240629 到 20240702 某个场景优化过滤阶段的耗时对比

    业务指标对比

    无明显波动

    总结

    Spring Reactor 是一个响应式编程框架,非常适合类似 MXN 这样的流程编排系统,也是 Java 中异步编程的一种补充,但也会有一些其他的问题,例如潜在的线程安全问题,已有框架的冲突 ThreadLocal 等

    参考

    【1】深入 Netty 逻辑架构,从 Reactor 线程模型开始(一)-阿里云开发者社区

    【2】Reactor 3 Reference Guide

    【3】C10k 问题简述-CSDN 博客

    07-04 17:31