序
本文主要研究一下nacos的DataSyncer
DataSyncer
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java
@Component
@DependsOn("serverListManager")
public class DataSyncer {
@Autowired
private DataStore dataStore;
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private Serializer serializer;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
private Map<String, String> taskMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
startTimedSync();
}
public void submit(SyncTask task, long delay) {
// If it's a new task:
if (task.getRetryCount() == 0) {
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process, key: {}", key);
}
iterator.remove();
}
}
}
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
GlobalExecutor.submitDataSync(new Runnable() {
@Override
public void run() {
try {
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
List<String> keys = task.getKeys();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync keys: {}", keys);
}
Map<String, Datum> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
boolean success = NamingProxy.syncData(data, task.getTargetServer());
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
retrySync(syncTask);
} else {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
} catch (Exception e) {
Loggers.DISTRO.error("sync data failed.", e);
}
}
}, delay);
}
public void retrySync(SyncTask syncTask) {
Server server = new Server();
server.setIp(syncTask.getTargetServer().split(":")[0]);
server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
if (!getServers().contains(server)) {
// if server is no longer in healthy server list, ignore this task:
return;
}
// TODO may choose other retry policy.
submit(syncTask, partitionConfig.getSyncRetryDelay());
}
public void startTimedSync() {
GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
}
//......
public List<Server> getServers() {
return serverListManager.getHealthyServers();
}
public String buildKey(String key, String targetServer) {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}
}
- DataSyncer定义了submit、retrySync、startTimedSync、getServers等方法,其init方法会执行startTimedSync
- submit方法对于retryCount为0的任务会判断taskMap是否存在该任务如果存在则移除其taskKey,之后使用GlobalExecutor.submitDataSync提交一个sync任务,它主要是通过NamingProxy.syncData来同步,成功则移除,不成功则使用retrySync重试
- retrySync则重新构建server调用submit执行;startTimedSync方法则是使用GlobalExecutor.schedulePartitionDataTimedSync提交TimedSync任务;getServers则通过serverListManager.getHealthyServers()返回健康的实例
TimedSync
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DataSyncer.java
public class TimedSync implements Runnable {
@Override
public void run() {
try {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", getServers());
}
// send local timestamps to other servers:
Map<String, String> keyChecksums = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
keyChecksums.put(key, dataStore.get(key).value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
}
for (Server member : getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
NamingProxy.syncCheckSums(keyChecksums, member.getKey());
}
} catch (Exception e) {
Loggers.DISTRO.error("timed sync task failed.", e);
}
}
}
- TimedSync会使用NamingProxy.syncCheckSums同步keyChecksums进行校验
小结
- DataSyncer定义了submit、retrySync、startTimedSync、getServers等方法,其init方法会执行startTimedSync
- submit方法对于retryCount为0的任务会判断taskMap是否存在该任务如果存在则移除其taskKey,之后使用GlobalExecutor.submitDataSync提交一个sync任务,它主要是通过NamingProxy.syncData来同步,成功则移除,不成功则使用retrySync重试
- retrySync则重新构建server调用submit执行;startTimedSync方法则是使用GlobalExecutor.schedulePartitionDataTimedSync提交TimedSync任务;getServers则通过serverListManager.getHealthyServers()返回健康的实例