序
本文主要研究一下nacos的RaftPeerSet
RaftPeerSet
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java
@Component
@DependsOn("serverListManager")
public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware {
@Autowired
private ServerListManager serverListManager;
private ApplicationContext applicationContext;
private AtomicLong localTerm = new AtomicLong(0L);
private RaftPeer leader = null;
private Map<String, RaftPeer> peers = new HashMap<>();
private Set<String> sites = new HashSet<>();
private boolean ready = false;
public RaftPeerSet() {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@PostConstruct
public void init() {
serverListManager.listen(this);
}
public RaftPeer getLeader() {
if (STANDALONE_MODE) {
return local();
}
return leader;
}
public Set<String> allSites() {
return sites;
}
public boolean isReady() {
return ready;
}
public void remove(List<String> servers) {
for (String server : servers) {
peers.remove(server);
}
}
public RaftPeer update(RaftPeer peer) {
peers.put(peer.ip, peer);
return peer;
}
public boolean isLeader(String ip) {
if (STANDALONE_MODE) {
return true;
}
if (leader == null) {
Loggers.RAFT.warn("[IS LEADER] no leader is available now!");
return false;
}
return StringUtils.equals(leader.ip, ip);
}
public Set<String> allServersIncludeMyself() {
return peers.keySet();
}
public Set<String> allServersWithoutMySelf() {
Set<String> servers = new HashSet<String>(peers.keySet());
// exclude myself
servers.remove(local().ip);
return servers;
}
public Collection<RaftPeer> allPeers() {
return peers.values();
}
public int size() {
return peers.size();
}
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {
leader = peer;
applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
public RaftPeer makeLeader(RaftPeer candidate) {
if (!Objects.equals(leader, candidate)) {
leader = candidate;
applicationContext.publishEvent(new MakeLeaderEvent(this, leader));
Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}",
leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader));
}
for (final RaftPeer peer : peers.values()) {
Map<String, String> params = new HashMap<>(1);
if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {
try {
String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER);
HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}",
response.getResponseBody(), peer.ip);
peer.state = RaftPeer.State.FOLLOWER;
return 1;
}
update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
return 0;
}
});
} catch (Exception e) {
peer.state = RaftPeer.State.FOLLOWER;
Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
}
}
}
return update(candidate);
}
public RaftPeer local() {
RaftPeer peer = peers.get(NetUtils.localServer());
if (peer == null && SystemUtils.STANDALONE_MODE) {
RaftPeer localPeer = new RaftPeer();
localPeer.ip = NetUtils.localServer();
localPeer.term.set(localTerm.get());
peers.put(localPeer.ip, localPeer);
return localPeer;
}
if (peer == null) {
throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: "
+ Arrays.toString(peers.keySet().toArray()));
}
return peer;
}
public RaftPeer get(String server) {
return peers.get(server);
}
public int majorityCount() {
return peers.size() / 2 + 1;
}
public void reset() {
leader = null;
for (RaftPeer peer : peers.values()) {
peer.voteFor = null;
}
}
public void setTerm(long term) {
localTerm.set(term);
}
public long getTerm() {
return localTerm.get();
}
public boolean contains(RaftPeer remote) {
return peers.containsKey(remote.ip);
}
//......
}
- RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法
- decideLeader方法会遍历peers,然后使用TreeBag来统计peer.voteFor,当maxApproveCount大于等于majorityCount(),则将对应的peer的state标记为RaftPeer.State.LEADER,然后判断leader是否变更,变更则发布LeaderElectFinishedEvent事件
- makeLeader方法判断candidate与当前leader是否一致,不一致则更新leader为candidate,发布MakeLeaderEvent事件,然后遍历peers给非candidate且state是LEADER状态的的节点发送API_GET_PEER请求,然后更新该peer在本地的信息,如果请求失败则更新其state为RaftPeer.State.FOLLOWER
RaftCore.MasterElection
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
public class MasterElection implements Runnable {
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
public void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
JSON.toJSONString(getLeader()), local.term);
peers.reset();
local.term.incrementAndGet();
local.voteFor = local.ip;
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
params.put("vote", JSON.toJSONString(local));
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildURL(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
return 1;
}
RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
peers.decideLeader(peer);
return 0;
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
}
- RaftCore.MasterElection的sendVote方法在请求成功时会执行peers.decideLeader(peer)方法来选举leader
RaftCore.receivedBeat
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
@Component
public class RaftCore {
//......
public RaftPeer receivedBeat(JSONObject beat) throws Exception {
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
remote.ip = beat.getJSONObject("peer").getString("ip");
remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
remote.voteFor = beat.getJSONObject("peer").getString("voteFor");
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
remote.state, JSON.toJSONString(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
if (local.term.get() > remote.term.get()) {
Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
, remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
+ ", beat-to-term: " + local.term.get());
}
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
// mk follower
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
final JSONArray beatDatums = beat.getJSONArray("datums");
local.resetLeaderDue();
local.resetHeartbeatDue();
peers.makeLeader(remote);
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
// now check datums
List<String> batch = new ArrayList<>();
if (!switchDomain.isSendBeatOnly()) {
int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JSONObject entry = (JSONObject) object;
String key = entry.getString("key");
final String datumKey;
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}
long timestamp = entry.getLong("timestamp");
receivedKeysMap.put(datumKey, 1);
try {
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
continue;
}
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) {
continue;
}
Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"
, getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());
// update datum entry
String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() {
});
for (JSONObject datumJson : datumList) {
OPERATE_LOCK.lock();
Datum newDatum = null;
try {
Datum oldDatum = getDatum(datumJson.getString("key"));
if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {
Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);
continue;
}
if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.getString("key");
serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));
serviceDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);
newDatum = serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.getString("key");
instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));
instancesDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
raftStore.write(newDatum);
datums.put(newDatum.key, newDatum);
notifier.addTask(newDatum.key, ApplyAction.CHANGE);
local.resetLeaderDue();
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
raftStore.updateTerm(local.term.get());
Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);
} catch (Throwable e) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);
} finally {
OPERATE_LOCK.unlock();
}
}
TimeUnit.MILLISECONDS.sleep(200);
return 0;
}
});
batch.clear();
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}
}
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}
}
return local;
}
//......
}
- receivedBeat方法会调用peers.makeLeader(remote)来更新leader
小结
- RaftPeerSet提供了remove、update、isLeader、allServersIncludeMyself、allServersWithoutMySelf、allPeers、decideLeader、makeLeader、majorityCount、reset等方法
- decideLeader方法会遍历peers,然后使用TreeBag来统计peer.voteFor,当maxApproveCount大于等于majorityCount(),则将对应的peer的state标记为RaftPeer.State.LEADER,然后判断leader是否变更,变更则发布LeaderElectFinishedEvent事件
- makeLeader方法判断candidate与当前leader是否一致,不一致则更新leader为candidate,发布MakeLeaderEvent事件,然后遍历peers给非candidate且state是LEADER状态的的节点发送API_GET_PEER请求,然后更新该peer在本地的信息,如果请求失败则更新其state为RaftPeer.State.FOLLOWER