负载均衡

在之前集群的文章中,我们分析了通过监听注册中心可以获取到多个服务提供者,并创建多个Invoker,然后通过集群类如FailoverClusterInvoker将多个Invoker封装在一起,而外部的调用者以这个封装的Invoker为入口调用内部的多个Invoker,但是我们一次调用实际只能调用一个真实的Invoker(这里的真实的Invoker对应一个提供者),所以怎么在多个Invoker中选择出一个Invoker来,使得整体的服务调用的性能最大化,这就是负载均衡策略。另外,除了负载均衡,集群类的一个重要功能就是处理服务调用故障,当一个invoker调用失败后怎么办,根据对这种情况的不同处理策略分为不同的集群类。当然除了以上两个功能,集群类的另一个功能就是粘滞调用,这里不再赘述。
值得一提的是,消费端的Invoker其实是一个分层的结构,我们可以在配置文件中指定多个注册中心,这样就会封装出多个Invoker(每个Invoker对应一个注册中心),对于这多个Invoker再通过集群类的join方法封装为一个总的Invoker(StaticDirectory),所以集群类在执行故障处理,负载均衡策略时实际上也是分层级的。
好了,集群类的回顾就到这里,说这么多主要是想引出一点:负载均衡逻辑的执行是在集群类中,所以分析负载均衡就以集群类为切入点。
dubbo提供了五种负载均衡器,下面我们一一分析。

RandomLoadBalance(随机)

光看名字,会以为真的就是从Invoker列表中随机选择一个,实际上并不是这么简单,dubbo的随机负载均衡考虑到了权重因素,每个服务提供者的每个方法都有一个有一个权重,权重大的意味着被调用的可能性更大。 so, talk is cheap ,show me your code !

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Number of invokers
    int length = invokers.size();
    // Every invoker has the same weight?
    // 表示是否所有的invoker的权重都相等??
    boolean sameWeight = true;
    // the weight of every invokers
    // 用于记录invoker权重的数组
    int[] weights = new int[length];
    // the first invoker's weight
    // 第一个invoker的权重
    int firstWeight = getWeight(invokers.get(0), invocation);
    weights[0] = firstWeight;
    // The sum of weights
    // 所有invoker权重的和
    int totalWeight = firstWeight;
    // 遍历所有的invoker,获取权重,更新簿记量
    for (int i = 1; i < length; i++) {
        // 获取权重,默认是100
        // 考虑到了服务预热的特性,
        // 所谓服务预热是指服务在刚启动时不会一下子被赋予全部的权重,而是缓慢第达到设定的权重值
        // 默认权重是100, 默认的预热时间是10分钟
        int weight = getWeight(invokers.get(i), invocation);
        // save for later use
        // 记录权重
        weights[i] = weight;
        // Sum
        // 记录权重和
        totalWeight += weight;
        // 如果与第一个权重不相等,那么更新簿记量
        if (sameWeight && weight != firstWeight) {
            sameWeight = false;
        }
    }
    // 如果invoker的权重有区别,那么需要根据权重来随机
    if (totalWeight > 0 && !sameWeight) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        // 这是根据权重值随机的典型用法
        for (int i = 0; i < length; i++) {
            offset -= weights[i];
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    // 如果所有invoker的权重相等,或者权重和为0, 那么随机选择一个invoker
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

这里指的关注的是所谓的随机并不是完全随机,而是在考虑权重的基础上的随机,这种算法值得我们学习,以后再需要的场景中也可以这样做。
此外,dubbo还有服务预热的特性,就像我在注释中说的,这种策略应该是为了避免在服务刚启动时就涌上来大量的请求,导致服务崩溃,设置一个缓冲时间可以有效避免这种情况,默认的预热时间是10分钟,默认的权重值是100。

RoundRobinLoadBalance(轮询)

接下来,我们分析一下轮询调度算法。

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // 方法的key值
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
    if (map == null) {
        // 注意,这里使用的是putIfAbsent方法,考虑到并发问题
        methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
        map = methodWeightMap.get(key);
    }
    int totalWeight = 0;
    // 簿记量,记录当前的最大权重
    long maxCurrent = Long.MIN_VALUE;
    long now = System.currentTimeMillis();
    // 被选中的Invoker
    Invoker<T> selectedInvoker = null;
    WeightedRoundRobin selectedWRR = null;
    for (Invoker<T> invoker : invokers) {
        // 获取该url的唯一标识
        String identifyString = invoker.getUrl().toIdentityString();
        WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
        // 获取权重,考虑到预热
        int weight = getWeight(invoker, invocation);

        if (weightedRoundRobin == null) {
            weightedRoundRobin = new WeightedRoundRobin();
            // 设置权重
            weightedRoundRobin.setWeight(weight);
            // 仍然调用putIfAbsent方法
            map.putIfAbsent(identifyString, weightedRoundRobin);
            // TODO 个人认为这里应该加下面这句
            // weightedRoundRobin = map.get(identifyString);
        }
        if (weight != weightedRoundRobin.getWeight()) {
            //weight changed
            // 更新权重
            weightedRoundRobin.setWeight(weight);
        }
        // 获取当前的累积权重
        long cur = weightedRoundRobin.increaseCurrent();
        // 设置更新时间
        weightedRoundRobin.setLastUpdate(now);
        // 如果累积权重大于当前的最大权重,那么将当前选中的invoker设为这个invoker
        // 并更新最大权重值
        if (cur > maxCurrent) {
            maxCurrent = cur;
            selectedInvoker = invoker;
            selectedWRR = weightedRoundRobin;
        }
        // 累加权重和
        totalWeight += weight;
    }

    // 如果invoker有变化,那么将那些长时间未被选到的WeightedRoundRobin清除出去
    // 这里用AtomicBoolean来加锁,cas锁
    if (!updateLock.get() && invokers.size() != map.size()) {
        if (updateLock.compareAndSet(false, true)) {
            try {
                // copy -> modify -> update reference
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                newMap.putAll(map);
                Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, WeightedRoundRobin> item = it.next();
                    if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                        it.remove();
                    }
                }
                methodWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false);
            }
        }
    }
    if (selectedInvoker != null) {
        // 这一句很重要
        // 选中这个invoker之后,将他的累积权重减去这一轮的总权重,
        // 相当于将其放到队尾,当然仍然还是考虑到权重的,权重的大的即是减去总权重也还是有较高的累积权重
        selectedWRR.sel(totalWeight);
        return selectedInvoker;
    }
    // should not happen here
    // 逻辑上将,代码是不会到这里的
    // 但是由于语法的要求这里必须要有返回语句
    return invokers.get(0);
}

这里的轮询仍然是考虑到权重因素的,这里对于轮询的实现很巧妙,使用累积权重的方法,巧妙地将权重因素考虑进去;而当所有的服务提供者的权重都相等时,这种算法就会退化为标准的轮询算法,实际上累积权重在这里还起到了记忆轮询状态的作用。可以这么形象地理解,当调用过某个提供者后,就把这个提供者扔到靠后面的一个位置,权重越小越靠后,这样就实现了轮询的功能。

LeastActiveLoadBalance

// 最小活跃数,考虑权重因素
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Number of invokers
    int length = invokers.size();
    // The least active value of all invokers
    // 用于记录最小活跃数
    int leastActive = -1;
    // The number of invokers having the same least active value (leastActive)
    // 相同最小活跃数的invoker数量
    int leastCount = 0;
    // The index of invokers having the same least active value (leastActive)
    // 拥有相同的最小活跃数的那些invoker的下标
    int[] leastIndexes = new int[length];
    // the weight of every invokers
    // 每个invoker的权重
    int[] weights = new int[length];
    // The sum of the warmup weights of all the least active invokes
    // 权重和
    int totalWeight = 0;
    // The weight of the first least active invoke
    // 第一个最小活跃数的invoker的权重
    int firstWeight = 0;
    // Every least active invoker has the same weight value?
    // 如果有多个相同最小活跃数的invoker,他们的权重是否相同
    boolean sameWeight = true;


    // Filter out all the least active invokers
    // 这个循环的作用是选出最小活跃数的invoker,如果有多个相同最小活跃数的invoker,全部选择出来
    for (int i = 0; i < length; i++) {
        Invoker<T> invoker = invokers.get(i);
        // Get the active number of the invoke
        int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
        // Get the weight of the invoke configuration. The default value is 100.
        int afterWarmup = getWeight(invoker, invocation);
        // save for later use
        weights[i] = afterWarmup;
        // If it is the first invoker or the active number of the invoker is less than the current least active number
        if (leastActive == -1 || active < leastActive) {
            // Reset the active number of the current invoker to the least active number
            leastActive = active;
            // Reset the number of least active invokers
            leastCount = 1;
            // Put the first least active invoker first in leastIndexs
            leastIndexes[0] = i;
            // Reset totalWeight
            totalWeight = afterWarmup;
            // Record the weight the first least active invoker
            firstWeight = afterWarmup;
            // Each invoke has the same weight (only one invoker here)
            sameWeight = true;
            // If current invoker's active value equals with leaseActive, then accumulating.
        } else if (active == leastActive) {
            // Record the index of the least active invoker in leastIndexs order
            leastIndexes[leastCount++] = i;
            // Accumulate the total weight of the least active invoker
            totalWeight += afterWarmup;
            // If every invoker has the same weight?
            if (sameWeight && i > 0
                    && afterWarmup != firstWeight) {
                sameWeight = false;
            }
        }
    }
    // Choose an invoker from all the least active invokers
    // 如果最小活跃数的invoker只有一个,那么就不用考虑权重因素
    if (leastCount == 1) {
        // If we got exactly one invoker having the least active value, return this invoker directly.
        return invokers.get(leastIndexes[0]);
    }
    // 如果他们的权重不同,那么需要考虑权重的因素
    // 这个方法与RandomLoadBalance中类似
    if (!sameWeight && totalWeight > 0) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
        // totalWeight.
        int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < leastCount; i++) {
            int leastIndex = leastIndexes[i];
            offsetWeight -= weights[leastIndex];
            if (offsetWeight < 0) {
                return invokers.get(leastIndex);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    // 如果他们的权重相同,那就随机选择一个
    return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}

这个方法相信大家都能看懂,这里我想重点说一下最小活跃数的维护方法。
最小活跃数是通过RpcStatus类维护的,这个类对于每个服务的每个方法都维护了一个RpcStatus类,可以看出来,dubbo对于负载均衡是细化到方法级别的,粒度还是很细的,从前面轮询算法也可以看出来这点。
RpcStatus类中除了维护最小活跃数,还维护了调用事件,总调用时间,调用次数,失败的调用次数等等统计量,那么问题是:这些统计量是在什么时候更新的呢?你肯定会回答当然是在服务调用的时候更新的。那么在代码中是怎么保证在每次服务调用的时候都更新相应的RpcStatus类呢??答案就是ExtensionLoader的AOP特性,可见dubbo的spi机制真是无处不在,贯穿了框架的各个部分。ExtensionLoader在加载一个接口的实现类的时候会将一些包装类缓存起来,当实例化一个实现类的时候会用这些包装类对实例进行层层包装(通过构造方法),这样就实现了多层代理,每个包装类都会加入一些通用的功能,实际上就是切面的思想。 服务调用的统计其实正是一个通用的功能,很适合作为一个横切面,切入所有的Invoker中。而消费端Invoker的创建是在Protocol.refer方法中,我们看一下Protocol实现类,发现了一些包装类:

  • ProtocolFilterWrapper, 嵌入可用的Filter
  • ProtocolListenerWrapper, 加入监听器
  • QosProtocolWrapper, 启动QOS服务

最小活跃数功能的实现是通过过滤器。ProtocolFilterWrapper.refer方法在创建invoker时会通过层层代理的方式形成一个过滤器链,注意对于注册中心的Invoker是不用添加过滤器链的,只有对真正执行远程调用的那些Invoker才会添加过滤器链。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

Filter也是一个SPI接口,是通过ExtensionLoader加载的,在加载Filter接口是调用了getActivateExtension方法,与Activate注解有关。
活跃数的功能是通过ActiveLimitFilter过滤器实现的。

ActiveLimitFilter.invoke

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    // 调用的方法名
    String methodName = invocation.getMethodName();
    // 最大活跃数,默认是0,0表示不限制,这个逻辑在RpcStatus.beginCount中
    int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
    // 获取方法对应的RpcStatus对象
    RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
    // 如果增加活跃数失败,就需要等待,
    // 这里的活跃数类似于信号量,作用是控制最大并发量
    if (!count.beginCount(url, methodName, max)) {
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
        long start = System.currentTimeMillis();
        long remain = timeout;
        synchronized (count) {
            // 等待活跃数降下来,并试图获取到一个活跃数,知道超时
            while (!count.beginCount(url, methodName, max)) {
                try {
                    count.wait(remain);
                } catch (InterruptedException e) {
                    // ignore
                }
                long elapsed = System.currentTimeMillis() - start;
                remain = timeout - elapsed;
                if (remain <= 0) {
                    throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                            + invoker.getInterface().getName() + ", method: "
                            + invocation.getMethodName() + ", elapsed: " + elapsed
                            + ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
                            + ". max concurrent invoke limit: " + max);
                }
            }
        }
    }

    // 到这里说明成功获取到一个活跃数,那么可以继续后面的调用
    boolean isSuccess = true;
    long begin = System.currentTimeMillis();
    try {
        return invoker.invoke(invocation);
    } catch (RuntimeException t) {
        isSuccess = false;
        throw t;
    } finally {
        // 调用结束,释放获取到的活跃数
        count.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
        // 这里指的思考一下,为什么只有在max>0时才通知
        // 因为max<=0时,最大活跃数是Integer.MAX_VALUE, 相当于不设上限,
        // 所以在前面的循环中第一次就能获取到活跃数,压根不会进入循环,也就不会获取锁等待,
        // 所以这里也就不需要通知了
        if (max > 0) {
            synchronized (count) {
                count.notifyAll();
            }
        }
    }
}

注释已经很清楚,就不再赘述。

ConsistentHashLoadBalance

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";

private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String methodName = RpcUtils.getMethodName(invocation);
    // 以方法为粒度
    String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
    // invokers的唯一标示,
    int identityHashCode = System.identityHashCode(invokers);
    ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
    // 如果invokers列表变了,就重新hash
    if (selector == null || selector.identityHashCode != identityHashCode) {
        selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
        selector = (ConsistentHashSelector<T>) selectors.get(key);
    }
    return selector.select(invocation);
}

private static final class ConsistentHashSelector<T> {

    private final TreeMap<Long, Invoker<T>> virtualInvokers;

    private final int replicaNumber;

    private final int identityHashCode;

    private final int[] argumentIndex;

    ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
        // 虚拟节点,用于使请求更均匀
        this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
        this.identityHashCode = identityHashCode;
        URL url = invokers.get(0).getUrl();
        // 虚拟节点个数
        this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
        // 参与生成hash值的参数的序号
        String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
        argumentIndex = new int[index.length];
        for (int i = 0; i < index.length; i++) {
            argumentIndex[i] = Integer.parseInt(index[i]);
        }
        // 初始化的时候生成hash环
        for (Invoker<T> invoker : invokers) {
            // 一个地址(ip:port)生成的hash节点实际上是固定的
            String address = invoker.getUrl().getAddress();
            for (int i = 0; i < replicaNumber / 4; i++) {
                byte[] digest = md5(address + i);
                for (int h = 0; h < 4; h++) {
                    // 生成的hash值尽量散列
                    long m = hash(digest, h);
                    virtualInvokers.put(m, invoker);
                }
            }
        }
    }


    public Invoker<T> select(Invocation invocation) {
        // 用指定的参数参与生成hash
        String key = toKey(invocation.getArguments());
        byte[] digest = md5(key);
        // 只要参数相同,那么生成的hash值就是相同的,选择到的invoker就是相同的
        // 这也就保证了相同的调用(同一个服务的,并且调用参数相同),总会被分配到固定的提供者
        return selectForKey(hash(digest, 0));
    }

    private String toKey(Object[] args) {
        StringBuilder buf = new StringBuilder();
        for (int i : argumentIndex) {
            if (i >= 0 && i < args.length) {
                buf.append(args[i]);
            }
        }
        return buf.toString();
    }

    private Invoker<T> selectForKey(long hash) {
        Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
        // entry == null说明hash比treeMap中最大的hash值还大,
        // 根据一致性hash环的算法,这是需要绕回第一个节点,即hash最小的那个节点
        if (entry == null) {
            entry = virtualInvokers.firstEntry();
        }
        return entry.getValue();
    }

    // 散列函数,这种方法使生成的hash尽量均匀分散
    private long hash(byte[] digest, int number) {
        return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                | (digest[number * 4] & 0xFF))
                & 0xFFFFFFFFL;
    }

    private byte[] md5(String value) {
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        md5.reset();
        byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
        md5.update(bytes);
        return md5.digest();
    }
}

}

首先我们要理解一致性hash的算法原理,明白一致性hash算法之后再来看这个类,就简单多了。
值得注意的是,这里使用了TreeMap保存hash环的所有hash值值得借鉴,这样做的好处是查找快速,效率很高。

05-12 09:38