由于时间安排上的原因,这次的代码写的稍微有些简略,只能算是自己对RAFT协议的一个巩固。

实现定义2个节点,使用读取配置文件来获取IP和端口以及节点ID

网络使用boost同步流程 一个线程收 一个线程发送

1 收的线程根据接受的数据 判断是心跳包还是选举请求还是选举请求回复  来更新自己的时间逻辑编号term 更新是否投票isVote 和最新term中那些节点投了自己的选举票map<int,int> // nodeid, term

2 发送的节点每个200MS则轮询一次,根据结点当前状态减少等待时间(等待时间根据节点状态调节为1000ms心跳间隔或者1500-5000的随机选举超时)

现运行看看效果

我们需要两个节点 所以需要将exe和配置文件放入不同的文件夹 如图

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

启动程序

1 初始两个节点都是follower状态 等待leader发送心跳

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

2 由于目前无leader 所以两个节点其中之一在随机的超时时间后,发起选举投票

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

3 之后节点1 成为leader,以1秒(1000ms)的间隔发送心跳包 进入正常状态

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

4 在状态3 的情况下 关闭folloer状态的节点2 对情况并无影响。leader节点1 会持续尝试连接follower节点2

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

5 节点2 再次连接 由于leader节点1 发送的心跳term为1  大于新启动节点2的初始化term 0 。所以节点2 会马上成为follower ,接受leader节点1的心跳

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

6 在状态3 的情况下,如果关闭的是leader节点1,节点2 在一段时候未接受到心跳后,就会广播选举请求,请求自己成为leader,但是由于没有节点与节点2的投票一致,也没有其他的节点选举投票,节点2将持续尝试选举自己成为leader

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

7 节点1上线后,同意节点2的选举请求,节点2接收超过半数以上的投票,成为leader。开始以1秒间隔发送心跳包。

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

代码如下

基础结构体:

 const enum STATUS {
LEADER_STATUS = ,
FOLLOWER_STATUS,
CANDIDATE_STATUS,
PRE_VOTE_STAUS,
}; const enum INFOTYPE {
DEFAULT_TYPE = ,
HEART_BREAT_TYPE,
VOTE_LEADER_TYPE,
VOTE_LEADER_RESP_TYPE, }; typedef struct netInfo {
int fromID;
int toID;
INFOTYPE infotype;
int term;
int voteId; //选举ID infotype为votetype才有效
}NetInfo; typedef struct locaInfo {
int id;
int leaderID;
STATUS status;
int term;
int isVote;
int IsRecvHeartbeat;
int electionTimeout;
std::map<int, int> voteRecord;// id term有此记录表示该term收到该id投取自己一票
}LocalInfo; typedef struct localInfoWithLock {
LocalInfo locInfo;
std::mutex m;
}LocalInfoWithLock;

基本数据结构

 #include "RaftManager.h"
#include "NetInfoHandler.h"
#include "StatusHandler.h"
#include <random>
#include <functional> using namespace std; std::shared_ptr<RaftManager> RaftManager::p = nullptr; bool RaftManager::Init() {
//可以使用json 读取配置
ReadConfig cfg("nodeCfg");
map<string, string> kv = cfg.Do(); if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
assert();
return false;
}
ip = kv["ip"]; portStart = stoi(kv["portStart"]); nodeID = stoi(kv["nodeID"]);
heartbeatTime = ;
if (kv.find("heartbeatTime") != kv.end())
heartbeatTime = stoi(kv["heartbeatTime"]); locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = ;
locInfolock.locInfo.IsRecvHeartbeat = ; locInfolock.locInfo.isVote = ;
locInfolock.locInfo.electionTimeout = ;
locInfolock.locInfo.status = FOLLOWER_STATUS;
locInfolock.locInfo.voteRecord.clear(); std::random_device rd;
std::default_random_engine engine(rd());
std::uniform_int_distribution<> dis(, );
dice = std::bind(dis, engine); return true;
} void RaftManager::SendFunc(int sendId) {
std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
tcp::resolver resolver(io_service);
while () {
int port = portStart+ sendId; try {
boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", to_string(port) }));
}
catch (std::exception& e) {
//std::cerr << e.what() << std::endl;
continue;
std::this_thread::sleep_for(std::chrono::milliseconds());
}
//============================================================
netInfo netinfo;
while () {
q.Take(netinfo);
boost::system::error_code ignored_error;
boost::asio::write(*s, boost::asio::buffer(&netinfo, sizeof(netinfo)), ignored_error);
if (ignored_error) {
std::cerr << boost::system::system_error(ignored_error).what() << std::endl;
break;
} std::cout << "\n==========================================================>" << std::endl;
std::cout << "Send netinfo" << std::endl;
std::cout << "netinf.fromID = " << netinfo.fromID << std::endl;
std::cout << "netinf.toID = " << netinfo.toID << std::endl;
std::cout << "netinf.infotype = " << netinfo.infotype << std::endl;
std::cout << "netinf.term = " << netinfo.term << std::endl;
std::cout << "netinf.voteId = " << netinfo.voteId << std::endl << std::endl;
std::cout << "<==========================================================" << std::endl;
}
} } void RaftManager::LoopCheck(LocalInfoWithLock& locInfolock) {
int looptime = ;
StatusHandler handler;
while () {
handler.DiapatchByStatus(locInfolock,q);
std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
} return;
} void RaftManager::RecvNetInfo(tcp::socket sock) {
BYTE data[] = { };
boost::system::error_code error;
NetInfo netinf; for (;;) {
size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
if (error == boost::asio::error::eof)
return; // Connection closed cleanly by peer.
else if (error) {
std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
return;
}
if (length != sizeof(netinf)) {
std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
continue;
} std::cout << "\n==========================================================>" << std::endl;
std::cout << "recv netinfo" << std::endl;
std::cout << "netinf.fromID = " << netinf.fromID << std::endl;
std::cout << "netinf.toID = " << netinf.toID << std::endl;
std::cout << "netinf.infotype = " << netinf.infotype << std::endl;
std::cout << "netinf.term = " << netinf.term << std::endl;
std::cout << "netinf.voteId = " << netinf.voteId << std::endl << std::endl;
std::cout << "<==========================================================" << std::endl; NetInfoHandler handler;
handler.DispatchByinfoType(netinf,q, locInfolock);
} } bool RaftManager::Go() {
if (ip == "" || portStart == || nodeID == )
return false;
try {
for (int i = ; i <= NODE_COUNT; i++) {
if (i != nodeID) {
std::thread tsend = std::thread(&RaftManager::SendFunc, shared_from_this(),i);
tsend.detach();
}
} std::thread tloop = std::thread(&RaftManager::LoopCheck, shared_from_this(), std::ref(locInfolock));
tloop.detach(); int port = portStart + nodeID;
tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port)); for (;;)
{
tcp::socket sock(io_service);
a.accept(sock);
std::cout << "accept\n";
std::thread(&RaftManager::RecvNetInfo, shared_from_this(), std::move(sock)).detach();
} }
catch (std::exception& e) {
std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
return false;
} return true;
}

读取配置文件和开启网络连接的代码

 #include "StatusHandler.h"
#include "RaftManager.h"
#include <iostream> void StatusHandler::DiapatchByStatus(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
LocalInfo localInfo;
//加锁获取当前状态 决定是否进行发送操作
{
//加锁获取本地当前状态
std::lock_guard<std::mutex> lck(locInfolock.m);
localInfo = locInfolock.locInfo;
} switch (localInfo.status) {
case LEADER_STATUS:
HandleLeaderSend(locInfolock,q);
break;
case FOLLOWER_STATUS:
HandleFollowerSend(locInfolock,q);
break;
case CANDIDATE_STATUS:
HandleCandidateSend(locInfolock,q);
break;
default:
std::cerr << "Unknown status!" << std::endl;
}
} void StatusHandler::HandleLeaderSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
bool isSendheartbeat = false;
int nodeid = ;
int term = ; {
std::lock_guard<std::mutex> lck(locInfolock.m);
if (locInfolock.locInfo.electionTimeout > ) {
locInfolock.locInfo.electionTimeout -= ;
}
//超过时间限制
if (locInfolock.locInfo.electionTimeout <= && locInfolock.locInfo.status == LEADER_STATUS) {
isSendheartbeat = true;
nodeid = locInfolock.locInfo.id;
term = locInfolock.locInfo.term;
locInfolock.locInfo.electionTimeout = ;
}
}
if (isSendheartbeat) {
for (int i = ; i <= NODE_COUNT; i++) {
if (i != nodeid) {
netInfo netinfo{ nodeid ,i,HEART_BREAT_TYPE ,term, };
q.Put(netinfo);
}
}
}
} void StatusHandler::HandleFollowerSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
bool isSendVoteNetInfo = false;
int nodeid = ;
int term = ;
//加锁获取本地当前状态
{
//std::cout << "Enter " << __FUNCTION__ << std::endl;
std::lock_guard<std::mutex> lck(locInfolock.m);
if (locInfolock.locInfo.electionTimeout > ) {
locInfolock.locInfo.electionTimeout -= ;
}
//超过时间限制
if (locInfolock.locInfo.electionTimeout <= ) {
std::cout << "electionTimeout .change to CANDIDATE_STATUS" << std::endl;
if (locInfolock.locInfo.IsRecvHeartbeat == ) {
//心跳超时 切换到选举模式
locInfolock.locInfo.term++;
locInfolock.locInfo.status = CANDIDATE_STATUS;
locInfolock.locInfo.voteRecord.clear();
locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] =
locInfolock.locInfo.term;
isSendVoteNetInfo = true;
term = locInfolock.locInfo.term;
nodeid = locInfolock.locInfo.id;
locInfolock.locInfo.electionTimeout = dice();
}
else {
locInfolock.locInfo.IsRecvHeartbeat = ;
}
}
else if ( (locInfolock.locInfo.electionTimeout > ) &&
(locInfolock.locInfo.IsRecvHeartbeat == ) &&
(locInfolock.locInfo.status == FOLLOWER_STATUS) )
{
std::cout << "Check hearbeat OK!!! Clear electionTimeout" << std::endl;
locInfolock.locInfo.IsRecvHeartbeat = ;
locInfolock.locInfo.electionTimeout = dice();
}
} if (isSendVoteNetInfo) {
for (int i = ; i <= NODE_COUNT; i++) {
if (i != nodeid) {
netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid };
q.Put(netinfo);
}
}
} } void StatusHandler::HandleCandidateSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
bool isSendVoteNetInfo = false;
int nodeid = ;
int term = ;
{
std::lock_guard<std::mutex> lck(locInfolock.m);
if (locInfolock.locInfo.electionTimeout > ) {
locInfolock.locInfo.electionTimeout -= ;
}
//超过时间限制
if (locInfolock.locInfo.electionTimeout <= ) {
std::cout << "electionTimeout .CANDIDATE_STATUS too" << std::endl;
if (locInfolock.locInfo.IsRecvHeartbeat == ) {
//心跳超时 切换到选举模式
locInfolock.locInfo.term++;
locInfolock.locInfo.status = CANDIDATE_STATUS;
locInfolock.locInfo.voteRecord.clear();
locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] =
locInfolock.locInfo.term;
}
isSendVoteNetInfo = true;
term = locInfolock.locInfo.term;
nodeid = locInfolock.locInfo.id;
locInfolock.locInfo.electionTimeout = dice();
}
} if (isSendVoteNetInfo) {
for (int i = ; i <= NODE_COUNT; i++) {
if (i != nodeid) {
netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid };
q.Put(netinfo);
}
}
}
}

每间隔200秒就进行状态检测切换,和超时发送回复代码

 #include "NetInfoHandler.h"
#include "RaftManager.h" void NetInfoHandler::DispatchByinfoType(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
{
std::lock_guard<std::mutex> lck(locInfolock.m);
if (netinf.term < locInfolock.locInfo.term)
return;
if (netinf.term > locInfolock.locInfo.term) {
locInfolock.locInfo.term = netinf.term;
locInfolock.locInfo.status = FOLLOWER_STATUS;
locInfolock.locInfo.isVote = ;
locInfolock.locInfo.IsRecvHeartbeat = ;
locInfolock.locInfo.electionTimeout = dice();
locInfolock.locInfo.voteRecord.clear();
}
}
switch (netinf.infotype) {
case HEART_BREAT_TYPE:
HandleHeartBeatTypeRecv(netinf,q, locInfolock);
break;
case VOTE_LEADER_TYPE:
HandleVoteTypeRecv(netinf,q, locInfolock);
break;
case VOTE_LEADER_RESP_TYPE:
HandleVoteRespTypeRecv(netinf,q, locInfolock);
break;
default:
std::cerr << "Recv Unknown info type." << std::endl;
}
} void NetInfoHandler::HandleVoteRespTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q,LocalInfoWithLock& locInfolock) { {
std::lock_guard<std::mutex> lck(locInfolock.m);
if (netinf.term < locInfolock.locInfo.term)
return;
if (netinf.term > locInfolock.locInfo.term) {
locInfolock.locInfo.term = netinf.term;
locInfolock.locInfo.status = FOLLOWER_STATUS;
locInfolock.locInfo.isVote = ;
locInfolock.locInfo.IsRecvHeartbeat = ;
locInfolock.locInfo.voteRecord.clear();
}
if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == locInfolock.locInfo.id && netinf.voteId == locInfolock.locInfo.id) {
//更新本地map记录
locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
}
int count = ;
std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
//查看本term的投票是否达半数以上
while (it != locInfolock.locInfo.voteRecord.end()) {
if (it->second == locInfolock.locInfo.term)
count++;
it++;
}
if (count > NODE_COUNT / ) {
//达到半数以上 转化为leader模式 否则继续选举
locInfolock.locInfo.leaderID = locInfolock.locInfo.id;
locInfolock.locInfo.IsRecvHeartbeat = ;
locInfolock.locInfo.status = LEADER_STATUS;
locInfolock.locInfo.electionTimeout = ;
std::cout << "I am the leader term = " <<
locInfolock.locInfo.term << std::endl;
}
} } void NetInfoHandler::HandleVoteTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) { NetInfo respNetInfo;
bool isSend = false;
{
std::lock_guard<std::mutex> lck(locInfolock.m);
if (netinf.term < locInfolock.locInfo.term)
return;
if (netinf.term > locInfolock.locInfo.term) {
locInfolock.locInfo.term = netinf.term;
locInfolock.locInfo.status = FOLLOWER_STATUS;
locInfolock.locInfo.isVote = ;
locInfolock.locInfo.IsRecvHeartbeat = ;
locInfolock.locInfo.voteRecord.clear();
}
if (locInfolock.locInfo.isVote == && locInfolock.locInfo.status == FOLLOWER_STATUS) {
respNetInfo.fromID = locInfolock.locInfo.id;
respNetInfo.toID = netinf.fromID;
respNetInfo.term = netinf.term;
respNetInfo.infotype = VOTE_LEADER_RESP_TYPE;
respNetInfo.voteId = netinf.voteId;
locInfolock.locInfo.isVote = ;
isSend = true;
}
else if(locInfolock.locInfo.status == FOLLOWER_STATUS){
respNetInfo.fromID = locInfolock.locInfo.id;
respNetInfo.toID = netinf.fromID;
respNetInfo.term = netinf.term;
respNetInfo.infotype = VOTE_LEADER_RESP_TYPE;
respNetInfo.voteId = ;
isSend = true;
}
}
if(isSend == true)
q.Put(respNetInfo);
} void NetInfoHandler::HandleHeartBeatTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) { {
std::lock_guard<std::mutex> lck(locInfolock.m);
if (netinf.term < locInfolock.locInfo.term)
return;
if (netinf.term > locInfolock.locInfo.term) {
locInfolock.locInfo.term = netinf.term;
locInfolock.locInfo.status = FOLLOWER_STATUS;
locInfolock.locInfo.isVote = ;
locInfolock.locInfo.IsRecvHeartbeat = ;
locInfolock.locInfo.voteRecord.clear();
} locInfolock.locInfo.IsRecvHeartbeat = ;
}
}

收到信息,进行处理以及发送告知自己状态改变的代码

分布式协议学习笔记(三) Raft 选举自编写代码练习-LMLPHP

04-16 18:57