序
本文主要研究一下nacos的DistroMapper
ServerChangeListener
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/servers/ServerChangeListener.java
public interface ServerChangeListener {
/**
* If member list changed, this method is invoked.
*
* @param servers servers after change
*/
void onChangeServerList(List<Server> servers);
/**
* If reachable member list changed, this method is invoked.
*
* @param healthyServer reachable servers after change
*/
void onChangeHealthyServerList(List<Server> healthyServer);
}
- ServerChangeListener定义了onChangeServerList、onChangeHealthyServerList方法
DistroMapper
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java
@Component("distroMapper")
public class DistroMapper implements ServerChangeListener {
private List<String> healthyList = new ArrayList<>();
public List<String> getHealthyList() {
return healthyList;
}
@Autowired
private SwitchDomain switchDomain;
@Autowired
private ServerListManager serverListManager;
/**
* init server list
*/
@PostConstruct
public void init() {
serverListManager.listen(this);
}
public boolean responsible(Cluster cluster, Instance instance) {
return switchDomain.isHealthCheckEnabled(cluster.getServiceName())
&& !cluster.getHealthCheckTask().isCancelled()
&& responsible(cluster.getServiceName())
&& cluster.contains(instance);
}
public boolean responsible(String serviceName) {
if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
return true;
}
if (CollectionUtils.isEmpty(healthyList)) {
// means distro config is not ready yet
return false;
}
int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) {
return true;
}
int target = distroHash(serviceName) % healthyList.size();
return target >= index && target <= lastIndex;
}
public String mapSrv(String serviceName) {
if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {
return NetUtils.localServer();
}
try {
return healthyList.get(distroHash(serviceName) % healthyList.size());
} catch (Exception e) {
Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e);
return NetUtils.localServer();
}
}
public int distroHash(String serviceName) {
return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
}
@Override
public void onChangeServerList(List<Server> latestMembers) {
}
@Override
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {
List<String> newHealthyList = new ArrayList<>();
for (Server server : latestReachableMembers) {
newHealthyList.add(server.getKey());
}
healthyList = newHealthyList;
}
}
- DistroMapper实现了ServerChangeListener接口,其onChangeHealthyServerList方法会更新healthyList
- 它还提供了responsible方法,该方法在switchDomain.isHealthCheckEnabled以及cluster.getHealthCheckTask()不是cancelled的情况下会执行responsible,该方法会调用distroHash来计算hash值
- 它还提供了mapSrv方法,也是通过distroHash计算hash然后与healthyList的大小取余,最后返回server的key
小结
ServerChangeListener定义了onChangeServerList、onChangeHealthyServerList方法;DistroMapper实现了ServerChangeListener接口,其onChangeHealthyServerList方法会更新healthyList;DistroMapper还提供了responsible方法及mapSrv方法