SPI配置的默认实现
cache=com.alibaba.dubbo.cache.filter.CacheFilter validation=com.alibaba.dubbo.validation.filter.ValidationFilter echo=com.alibaba.dubbo.rpc.filter.EchoFilter generic=com.alibaba.dubbo.rpc.filter.GenericFilter genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter token=com.alibaba.dubbo.rpc.filter.TokenFilter accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter context=com.alibaba.dubbo.rpc.filter.ContextFilter consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
Consumer
ConsumerContextFilter
记录一些基础信息到PRCContext
@Activate( group = {"consumer"}, order = -10000 ) public class ConsumerContextFilter implements Filter { public ConsumerContextFilter() { } public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { /*** * 从线程缓存获取本次RpcContext.getContext() * 设置一些本次请求的基础信息到RpcContext */ RpcContext.getContext().setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); if (invocation instanceof RpcInvocation) { ((RpcInvocation)invocation).setInvoker(invoker); } Result var3; try { /** * 客户端相关参数是根据 invocation传递给消费者的 可以打断点看 也可以自定义一些数据 比如traceId */ var3 = invoker.invoke(invocation); } finally { RpcContext.getContext().clearAttachments(); } return var3; } }
ActiveLimitFilter
ActiveLimitFilter主要用于 限制同一个客户端对于一个服务端方法的并发调用量(客户端限流)。
/** * 控制调用服务的并发量 限流 * 同时支持多少请求 */ @Activate( group = {"consumer"}, value = {"actives"} ) public class ActiveLimitFilter implements Filter { public ActiveLimitFilter() { } public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); //获得 <dubbo:reference actives="1"> actives的数量 int max = invoker.getUrl().getMethodParameter(methodName, "actives", 0); //获取当前service当前方法的请求数量 RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); long timeout; //配置并发控制大于0才写 if (max > 0) { //获得当前方法的等待时间 timeout = (long)invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", 0); long start = System.currentTimeMillis(); long remain = timeout; //判断是否大于并发数 如果大于则等待 int active = count.getActive(); if (active >= max) { synchronized(count) { /** *1.while循环是有必要的 * 当收到其他线程notify 获得执行权 * 但是这个时候其他线程提前进入(active >= max) 判断为false获得执行权 count+1 * 这个时候 还需要while判断是否还有空闲请求 否则继续wait * */ while((active = count.getActive()) >= max) { try { //超时时间为 配置的超时时间 count.wait(remain); } catch (InterruptedException var32) { ; } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; //当其他线程通知等待线程执行 判断是否超时 如果超时了则不执行了 if (remain <= 0L) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } boolean var28 = false; Result var10; try { var28 = true; timeout = System.currentTimeMillis(); //获得执行权的 count+1 RpcStatus.beginCount(url, methodName); try { //执行 Result result = invoker.invoke(invocation); //执行完毕关闭 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - timeout, true); var10 = result; var28 = false; } catch (RuntimeException var31) { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - timeout, false); throw var31; } } finally { if (var28) { if (max > 0) { //通知等待的线程执行 synchronized(count) { count.notify(); } } } } if (max > 0) { synchronized(count) { //通知等待的线程执行 count.notify(); } } return var10; } }