序
本文主要研究一下nacos NamingProxy的getServiceList
NamingProxy.initRefreshSrvIfNeed
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java
public class NamingProxy {
private static final int DEFAULT_SERVER_PORT = 8848;
private int serverPort = DEFAULT_SERVER_PORT;
private String namespaceId;
private String endpoint;
private String nacosDomain;
private List<String> serverList;
private List<String> serversFromEndpoint = new ArrayList<String>();
private long lastSrvRefTime = 0L;
private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);
private Properties properties;
public NamingProxy(String namespaceId, String endpoint, String serverList) {
this.namespaceId = namespaceId;
this.endpoint = endpoint;
if (StringUtils.isNotEmpty(serverList)) {
this.serverList = Arrays.asList(serverList.split(","));
if (this.serverList.size() == 1) {
this.nacosDomain = serverList;
}
}
initRefreshSrvIfNeed();
}
private void initRefreshSrvIfNeed() {
if (StringUtils.isEmpty(endpoint)) {
return;
}
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.naming.serverlist.updater");
t.setDaemon(true);
return t;
}
});
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
refreshSrvIfNeed();
}
}, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
refreshSrvIfNeed();
}
//......
private void refreshSrvIfNeed() {
try {
if (!CollectionUtils.isEmpty(serverList)) {
NAMING_LOGGER.debug("server list provided by user: " + serverList);
return;
}
if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) {
return;
}
List<String> list = getServerListFromEndpoint();
if (CollectionUtils.isEmpty(list)) {
throw new Exception("Can not acquire Nacos list");
}
if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) {
NAMING_LOGGER.info("[SERVER-LIST] server list is updated: " + list);
}
serversFromEndpoint = list;
lastSrvRefTime = System.currentTimeMillis();
} catch (Throwable e) {
NAMING_LOGGER.warn("failed to update server list", e);
}
}
public List<String> getServerListFromEndpoint() {
try {
String urlString = "http://" + endpoint + "/nacos/serverlist";
List<String> headers = builderHeaders();
HttpClient.HttpResult result = HttpClient.httpGet(urlString, headers, null, UtilAndComs.ENCODING);
if (HttpURLConnection.HTTP_OK != result.code) {
throw new IOException("Error while requesting: " + urlString + "'. Server returned: "
+ result.code);
}
String content = result.content;
List<String> list = new ArrayList<String>();
for (String line : IoUtils.readLines(new StringReader(content))) {
if (!line.trim().isEmpty()) {
list.add(line.trim());
}
}
return list;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//......
}
- NamingProxy的构造器执行了initRefreshSrvIfNeed方法,该方法在endpoint不为空的时候,会注册一个定时任务,每隔vipSrvRefInterMillis时间执行一次refreshSrvIfNeed方法,同时立马调用了refreshSrvIfNeed方法
- refreshSrvIfNeed方法在serverList为空,且距离lastSrvRefTime大于等于vipSrvRefInterMillis时会通过getServerListFromEndpoint()方法获取serverList更新serversFromEndpoint及lastSrvRefTime
- getServerListFromEndpoint方法会向endpoint请求/serverlist接口,获取server端返回的serverList
NamingProxy.getServiceList
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java
public class NamingProxy {
private static final int DEFAULT_SERVER_PORT = 8848;
private int serverPort = DEFAULT_SERVER_PORT;
private String namespaceId;
private String endpoint;
private String nacosDomain;
private List<String> serverList;
private List<String> serversFromEndpoint = new ArrayList<String>();
private long lastSrvRefTime = 0L;
private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);
private Properties properties;
//......
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName) throws NacosException {
return getServiceList(pageNo, pageSize, groupName, null);
}
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {
Map<String, String> params = new HashMap<String, String>(4);
params.put("pageNo", String.valueOf(pageNo));
params.put("pageSize", String.valueOf(pageSize));
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.GROUP_NAME, groupName);
if (selector != null) {
switch (SelectorType.valueOf(selector.getType())) {
case none:
break;
case label:
ExpressionSelector expressionSelector = (ExpressionSelector) selector;
params.put("selector", JSON.toJSONString(expressionSelector));
break;
default:
break;
}
}
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/service/list", params);
JSONObject json = JSON.parseObject(result);
ListView<String> listView = new ListView<String>();
listView.setCount(json.getInteger("count"));
listView.setData(JSON.parseObject(json.getString("doms"), new TypeReference<List<String>>() {
}));
return listView;
}
public String reqAPI(String api, Map<String, String> params) throws NacosException {
List<String> snapshot = serversFromEndpoint;
if (!CollectionUtils.isEmpty(serverList)) {
snapshot = serverList;
}
return reqAPI(api, params, snapshot);
}
public String reqAPI(String api, Map<String, String> params, List<String> servers) {
return reqAPI(api, params, servers, HttpMethod.GET);
}
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
}
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, server, method);
} catch (NacosException e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("request {} failed.", server, e);
}
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, nacosDomain);
} catch (Exception e) {
exception = e;
NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}
//......
}
- getServiceList方法有个AbstractSelector参数,它会往请求的参数里头添加selector参数,目前label类型会添加ExpressionSelector,之后调用reqAPI方法请求/service/list接口
- reqAPI方法首先将serversFromEndpoint赋值给snapshot,但是serverList不为空的情况下会重置snapshot为serverList,然后进行reqAPI请求
- reqAPI方法会根据servers.size()随机一个index,然后以servers.size()为最大循环次数开始for循环,循环里头根据index获取server然后通过callServer请求,请求成功则跳出循环返回,请求失败则递增index并对servers.size()取余继续下次循环,如果都请求失败则最后抛出IllegalStateException
小结
- NamingProxy的构造器执行了initRefreshSrvIfNeed方法,该方法在endpoint不为空的时候,会注册一个定时任务,每隔vipSrvRefInterMillis时间执行一次refreshSrvIfNeed方法
- refreshSrvIfNeed方法在serverList为空,且距离lastSrvRefTime大于等于vipSrvRefInterMillis时会通过getServerListFromEndpoint()方法获取serverList更新serversFromEndpoint及lastSrvRefTime
- getServiceList方法优先以serverList作为server端地址列表,如果它为空再以serversFromEndpoint为准,然后通过reqAPI方法请求的时候,随机选择一个server进行请求,最多请求server.size()次,请求成功则跳出循环返回,请求失败则递增index并对servers.size()取余继续下次循环,如果都请求失败则最后抛出IllegalStateException