序
本文主要研究一下NacosNamingService的selectOneHealthyInstance
NacosNamingService
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
public class NacosNamingService implements NamingService {
private static final String DEFAULT_PORT = "8080";
private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
/**
* Each Naming instance should have different namespace.
*/
private String namespace;
private String endpoint;
private String serverList;
private String cacheDir;
private String logName;
private HostReactor hostReactor;
private BeatReactor beatReactor;
private EventDispatcher eventDispatcher;
private NamingProxy serverProxy;
//......
@Override
public Instance selectOneHealthyInstance(String serviceName) throws NacosException {
return selectOneHealthyInstance(serviceName, new ArrayList<String>());
}
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName) throws NacosException {
return selectOneHealthyInstance(serviceName, groupName, true);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException {
return selectOneHealthyInstance(serviceName, new ArrayList<String>(), subscribe);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, boolean subscribe) throws NacosException {
return selectOneHealthyInstance(serviceName, groupName, new ArrayList<String>(), subscribe);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException {
return selectOneHealthyInstance(serviceName, clusters, true);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters) throws NacosException {
return selectOneHealthyInstance(serviceName, groupName, clusters, true);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe)
throws NacosException {
return selectOneHealthyInstance(serviceName, Constants.DEFAULT_GROUP, clusters, subscribe);
}
@Override
public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
if (subscribe) {
return Balancer.RandomByWeight.selectHost(
hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
} else {
return Balancer.RandomByWeight.selectHost(
hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));
}
}
//......
}
- selectOneHealthyInstance跟selectInstances类似,只不过它返回的是单个instance;selectOneHealthyInstance也是先从hostReactor获取serviceInfo
- 如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo
- 获取到serviceInfo之后,selectOneHealthyInstance通过Balancer.RandomByWeight.selectHost方法来选取单个healthy的instance
Balancer
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/Balancer.java
public class Balancer {
/**
* report status to server
*/
public final static List<String> UNCONSISTENT_SERVICE_WITH_ADDRESS_SERVER = new CopyOnWriteArrayList<String>();
public static class RandomByWeight {
public static List<Instance> selectAll(ServiceInfo serviceInfo) {
List<Instance> hosts = serviceInfo.getHosts();
if (CollectionUtils.isEmpty(hosts)) {
throw new IllegalStateException("no host to srv for serviceInfo: " + serviceInfo.getName());
}
return hosts;
}
public static Instance selectHost(ServiceInfo dom) {
List<Instance> hosts = selectAll(dom);
if (CollectionUtils.isEmpty(hosts)) {
throw new IllegalStateException("no host to srv for service: " + dom.getName());
}
return getHostByRandomWeight(hosts);
}
}
/**
* Return one host from the host list by random-weight.
*
* @param hosts The list of the host.
* @return The random-weight result of the host
*/
protected static Instance getHostByRandomWeight(List<Instance> hosts) {
NAMING_LOGGER.debug("entry randomWithWeight");
if (hosts == null || hosts.size() == 0) {
NAMING_LOGGER.debug("hosts == null || hosts.size() == 0");
return null;
}
Chooser<String, Instance> vipChooser = new Chooser<String, Instance>("www.taobao.com");
NAMING_LOGGER.debug("new Chooser");
List<Pair<Instance>> hostsWithWeight = new ArrayList<Pair<Instance>>();
for (Instance host : hosts) {
if (host.isHealthy()) {
hostsWithWeight.add(new Pair<Instance>(host, host.getWeight()));
}
}
NAMING_LOGGER.debug("for (Host host : hosts)");
vipChooser.refresh(hostsWithWeight);
NAMING_LOGGER.debug("vipChooser.refresh");
return vipChooser.randomWithWeight();
}
}
- Balancer的RandomByWeight提供了selectAll及selectHost方法;selectAll针对serviceInfo.getHosts()进行了空判断,空的话会抛出IllegalStateException
- selectHost方法内部调用了selectAll方法,其最后通过getHostByRandomWeight来选取单个healthy的instance
- getHostByRandomWeight方法首先创建一个Chooser,然后选取healthy的instance构造hostsWithWeight,再通过vipChooser.refresh(hostsWithWeight)进行refresh,最后通过vipChooser.randomWithWeight()选取单个healthy的instance
Chooser
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java
public class Chooser<K, T> {
private K uniqueKey;
private volatile Ref<T> ref;
public T random() {
List<T> items = ref.items;
if (items.size() == 0) {
return null;
}
if (items.size() == 1) {
return items.get(0);
}
return items.get(ThreadLocalRandom.current().nextInt(items.size()));
}
public T randomWithWeight() {
Ref<T> ref = this.ref;
double random = ThreadLocalRandom.current().nextDouble(0, 1);
int index = Arrays.binarySearch(ref.weights, random);
if (index < 0) {
index = -index - 1;
} else {
return ref.items.get(index);
}
if (index >= 0 && index < ref.weights.length) {
if (random < ref.weights[index]) {
return ref.items.get(index);
}
}
/* This should never happen, but it ensures we will return a correct
* object in case there is some floating point inequality problem
* wrt the cumulative probabilities. */
return ref.items.get(ref.items.size() - 1);
}
public Chooser(K uniqueKey) {
this(uniqueKey, new ArrayList<Pair<T>>());
}
public Chooser(K uniqueKey, List<Pair<T>> pairs) {
Ref<T> ref = new Ref<T>(pairs);
ref.refresh();
this.uniqueKey = uniqueKey;
this.ref = ref;
}
public K getUniqueKey() {
return uniqueKey;
}
public Ref<T> getRef() {
return ref;
}
public void refresh(List<Pair<T>> itemsWithWeight) {
Ref<T> newRef = new Ref<T>(itemsWithWeight);
newRef.refresh();
newRef.poller = this.ref.poller.refresh(newRef.items);
this.ref = newRef;
}
//......
}
- Chooser的refresh方法会根据itemsWithWeight创建Ref,然后执行Ref的refresh方法;randomWithWeight方法通过Arrays.binarySearch(ref.weights, random)创建初始index,然后根据index从ref.items获取元素
Ref
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/utils/Chooser.java
public class Ref<T> {
private List<Pair<T>> itemsWithWeight = new ArrayList<Pair<T>>();
private List<T> items = new ArrayList<T>();
private Poller<T> poller = new GenericPoller<T>(items);
private double[] weights;
@SuppressWarnings("unchecked")
public Ref(List<Pair<T>> itemsWithWeight) {
this.itemsWithWeight = itemsWithWeight;
}
public void refresh() {
Double originWeightSum = (double) 0;
for (Pair<T> item : itemsWithWeight) {
double weight = item.weight();
//ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
if (weight <= 0) {
continue;
}
items.add(item.item());
if (Double.isInfinite(weight)) {
weight = 10000.0D;
}
if (Double.isNaN(weight)) {
weight = 1.0D;
}
originWeightSum += weight;
}
double[] exactWeights = new double[items.size()];
int index = 0;
for (Pair<T> item : itemsWithWeight) {
double singleWeight = item.weight();
//ignore item which weight is zero.see test_randomWithWeight_weight0 in ChooserTest
if (singleWeight <= 0) {
continue;
}
exactWeights[index++] = singleWeight / originWeightSum;
}
weights = new double[items.size()];
double randomRange = 0D;
for (int i = 0; i < index; i++) {
weights[i] = randomRange + exactWeights[i];
randomRange += exactWeights[i];
}
double doublePrecisionDelta = 0.0001;
if (index == 0 || (Math.abs(weights[index - 1] - 1) < doublePrecisionDelta)) {
return;
}
throw new IllegalStateException("Cumulative Weight caculate wrong , the sum of probabilities does not equals 1.");
}
//......
}
- Ref的refresh方法主要是初始化items及weights
小结
- selectOneHealthyInstance跟selectInstances类似,只不过它返回的是单个instance;selectOneHealthyInstance也是先从hostReactor获取serviceInfo
- 如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo
- 获取到serviceInfo之后,selectOneHealthyInstance通过Balancer.RandomByWeight.selectHost方法来选取单个healthy的instance