序
本文主要研究一下nacos Service的processClientBeat
Service.processClientBeat
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+";
@JSONField(serialize = false)
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
private String token;
private List<String> owners = new ArrayList<>();
private Boolean resetWeight = false;
private Boolean enabled = true;
private Selector selector = new NoneSelector();
private String namespaceId;
/**
* IP will be deleted if it has not send beat for some time, default timeout is 30 seconds.
*/
private long ipDeleteTimeout = 30 * 1000;
private volatile long lastModifiedMillis = 0L;
private volatile String checksum;
/**
* TODO set customized push expire time:
*/
private long pushCacheMillis = 0L;
private Map<String, Cluster> clusterMap = new HashMap<>();
//......
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
//......
}
- Service的processClientBeat方法会创建ClientBeatProcessor,并使用HealthCheckReactor进行调度
ClientBeatProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java
public class ClientBeatProcessor implements Runnable {
public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
private RsInfo rsInfo;
private Service service;
@JSONField(serialize = false)
public PushService getPushService() {
return SpringContext.getAppContext().getBean(PushService.class);
}
public RsInfo getRsInfo() {
return rsInfo;
}
public void setRsInfo(RsInfo rsInfo) {
this.rsInfo = rsInfo;
}
public Service getService() {
return service;
}
public void setService(Service service) {
this.service = service;
}
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}
}
}
- ClientBeatProcessor实现了Runnable方法,它会遍历instances更新指定ip及port的instance的lastBeat时间;同时对于非marked且healthy为false的instance更新其healthy为true并通过getPushService().serviceChanged发布变更事件
HealthCheckReactor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java
public class HealthCheckReactor {
private static final ScheduledExecutorService EXECUTOR;
private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
static {
int processorCount = Runtime.getRuntime().availableProcessors();
EXECUTOR
= Executors
.newScheduledThreadPool(processorCount <= 1 ? 1 : processorCount / 2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.health");
return thread;
}
});
}
public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {
task.setStartTime(System.currentTimeMillis());
return EXECUTOR.schedule(task, task.getCheckRTNormalized(), TimeUnit.MILLISECONDS);
}
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.putIfAbsent(task.taskKey(), EXECUTOR.scheduleWithFixedDelay(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
public static void cancelCheck(ClientBeatCheckTask task) {
ScheduledFuture scheduledFuture = futureMap.get(task.taskKey());
if (scheduledFuture == null) {
return;
}
try {
scheduledFuture.cancel(true);
} catch (Exception e) {
Loggers.EVT_LOG.error("[CANCEL-CHECK] cancel failed!", e);
}
}
public static ScheduledFuture<?> scheduleNow(Runnable task) {
return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
}
}
- HealthCheckReactor在static代码块创建了EXECUTOR,它提供了HealthCheckTask、ClientBeatCheckTask的schedule方法以及ClientBeatCheckTask的cancel方法,并提供了Runnable的scheduleNow方法
小结
- Service的processClientBeat方法会创建ClientBeatProcessor,并使用HealthCheckReactor进行调度
- ClientBeatProcessor实现了Runnable方法,它会遍历instances更新指定ip及port的instance的lastBeat时间;同时对于非marked且healthy为false的instance更新其healthy为true并通过getPushService().serviceChanged发布变更事件
- HealthCheckReactor在static代码块创建了EXECUTOR,它提供了HealthCheckTask、ClientBeatCheckTask的schedule方法以及ClientBeatCheckTask的cancel方法,并提供了Runnable的scheduleNow方法