学习https://github.com/huoyu820125/SecondPaxos 自己编写网络版本

在学习过程将此代码的线程 锁等改成c++11  就不用包含那么多文件

主要更改如下

 // MyPaxos.cpp: 定义控制台应用程序的入口点。
// #include "stdafx.h"
#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>
#include "Acceptor.h"
#include "Proposer.h" paxos::Proposer p[];
paxos::Acceptor a[];
int finishedCount = ;
std::mutex l[]; std::mutex printlock; void Proposer(int id) {
paxos::Proposer &proposer = p[(int)id];
paxos::PROPOSAL value = proposer.GetProposal();
paxos::PROPOSAL lastValue; int acceptorId[];
int count = ; while (true) {
value = proposer.GetProposal();//拿到提议
printlock.lock();
std::cout << "Proposer" << (int)id << "号开始(Propose阶段):提议=[编号:" << value.serialNum
<< ",提议:" << value.value << "]\n";
printlock.unlock();
count = ;
int i = ; for (i = ; i < ; i++)
{
/*
* 发送消息到第i个acceptor
* 经过一定时间达到acceptor,sleep(随机时间)模拟
* acceptor处理消息,mAcceptors[i].Propose()
* 回应proposer
* 经过一定时间proposer收到回应,sleep(随机时间)模拟
* proposer处理回应mProposer.proposed(ok, lastValue)
*/
//经过随机时间,消息到达了mAcceptors[i]
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % ) );
l[i].lock();
bool ok = a[i].Propose(value.serialNum, lastValue);
l[i].unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % ));
//处理Propose回应
if (!proposer.Proposed(ok, lastValue)) //重新开始Propose阶段
{
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % ));
//为了降低活锁,多等一会让别的proposer有机会完成自己的2阶段批准
break;
}
paxos::PROPOSAL curValue = proposer.GetProposal();//拿到提议
if (curValue.value != value.value)//acceptor本次回应可能推荐了一个提议
{
printlock.lock();
std::cout << "Proposer" << (int)id << "号修改了提议:提议=[编号:" <<
curValue.serialNum << ",提议:" << curValue.value << "]\n";
printlock.unlock();
}
acceptorId[count++] = i;//记录愿意投票的acceptor
if (proposer.StartAccept())
{
if ( == rand() % ) break;
}
}
//检查有没有达到Accept开始条件,如果没有表示要重新开始Propose阶段
if (!proposer.StartAccept()) continue;
//开始Accept阶段
//发送Accept消息到所有愿意投票的acceptor
value = proposer.GetProposal();
printlock.lock();
std::cout << "Proposer" << (int)id << "号开始(Accept阶段):提议=[编号:" <<
value.serialNum << ",提议:" << value.value << "]\n";
printlock.unlock();
for (i = ; i < count; i++)
{
//发送accept消息到acceptor
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % ));//经过随机时间,accept消息到达acceptor
//处理accept消息
l[acceptorId[i]].lock();
bool ok = a[acceptorId[i]].Accept(value);
l[acceptorId[i]].unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % ));//经过随机时间,accept回应到达proposer
//处理accept回应
if (!proposer.Accepted(ok)) //重新开始Propose阶段
{
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % ));///为了降低活锁,多等一会让别的proposer有机会完成自己的2阶段批准
break;
}
if (proposer.IsAgree())//成功批准了提议
{
printlock.lock();
std::cout << "Proposer" << (int)id << "号批准了提议:最终提议 = [编号:" <<
value.serialNum << ",提议:" << value.value << "]\n";
printlock.unlock();
return ;
}
}
}
return ;
} int main()
{
int i = ;
std::cout << "11个Proposer, 11个Acceptor准备进行Paxos\n"<<
"每个Proposer独立线程,Acceptor不需要线程\n"<<
"Proposer线程中等待随机时间:表示与Acceptor的通信时间\n"<<
"Proposer线程中调用Acceptor.Proposed()表示拿到了Propose请求结果\n"<<
"Proposer线程中调用Acceptor.Accepted()表示拿到了Accept请求结果\n"<<
"Proposer被批准后结束线程,其它线程继续投票最终,全部批准相同的值,达成一致。\n"; paxos::PROPOSAL value;
for (i = ; i < ; i++)
{
p[i].SetPlayerCount(, );
value.serialNum = value.value = i + ;
p[i].StartPropose(value);
} std::thread t[];
for (i = ; i < ; i++) {
t[i] = std::thread(Proposer, i);
}
for (i = ; i < ; i++) {
t[i].join();
}
while (true) {
std::this_thread::sleep_for(std::chrono::seconds());
} return ;
}

20180513添加

根据视频  paxos和分布式系统_1024x768_2.00M_h.264

添加自写代码 vs2017 boost1.65编译

方案1  单点接收多点提交 二段提交 抢占提交权

// Accepter.cpp: 定义控制台应用程序的入口点。
// #include "stdafx.h"
#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <mutex>
#include <thread>
#include <boost/asio.hpp> using boost::asio::ip::tcp; const int default_port = ; #pragma pack (1)
//提议数据结构
typedef struct PROPOSAL
{
int index; //当前请求的阶段
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}PROPOSAL; typedef struct ACCEPTSAL
{
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}ACCEPTSAL;
#pragma pack() enum INDEX {
INIT_INDEX = ,
PREPARE_INDEX,
ACCEPT_INDEX,
FINISH_INDEX,
ERROR_INDEX = -,
}; int current_index = PREPARE_INDEX;
ACCEPTSAL g_ServerRecord = {-,- };
boost::asio::io_service io_service;
std::mutex g_mtx;
int acceptepoch = -;
//========================================================== void HandlePropose(std::shared_ptr<tcp::socket> socket) {
PROPOSAL buf = { -,-,- };
ACCEPTSAL tmpAcpsal = { -,- }; try {
while () {
boost::asio::read(*socket, boost::asio::buffer(&buf, sizeof(buf)));
{
std::lock_guard<std::mutex> lock(g_mtx);
//prepare阶段
if (buf.index == PREPARE_INDEX) {
if (g_ServerRecord.epoch <= buf.epoch && g_ServerRecord.value == -) {
//更新最新的prepare epoch
g_ServerRecord.epoch = buf.epoch;
current_index = ACCEPT_INDEX;
}
}
else if (buf.index == ACCEPT_INDEX && (- == g_ServerRecord.value)) {
if ((buf.epoch >= g_ServerRecord.epoch)) {
g_ServerRecord.value = buf.value;
current_index = FINISH_INDEX;
}
}
//拷贝accepter记录
tmpAcpsal = g_ServerRecord;
}
//回复
boost::asio::write(*socket, boost::asio::buffer(&tmpAcpsal, sizeof(tmpAcpsal)));
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
return;
}
} int main()
{
try {
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), default_port));
for (;;) {
std::shared_ptr<tcp::socket> psocket = std::make_shared<tcp::socket>(io_service);
acceptor.accept(*psocket); std::thread t(HandlePropose,psocket);
t.detach();
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
} return ;
}

accepter

// Proposer.cpp: 定义控制台应用程序的入口点。
// #include "stdafx.h"
#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <mutex>
#include <thread>
#include <boost/asio.hpp> using boost::asio::ip::tcp; const std::string default_port = ""; #pragma pack (1)
//提议数据结构
typedef struct PROPOSAL
{
int index; //当前请求的阶段
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}PROPOSAL; typedef struct ACCEPTSAL
{
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}ACCEPTSAL;
#pragma pack() enum INDEX {
INIT_INDEX = ,
PREPARE_INDEX,
ACCEPT_INDEX,
FINISH_INDEX,
ERROR_INDEX = -,
}; //======================================================== int main()
{
try {
boost::asio::io_service io_service; tcp::resolver resolver(io_service);
tcp::resolver::query query("127.0.0.1", default_port.c_str());
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); tcp::socket socket(io_service);
boost::asio::connect(socket, endpoint_iterator); for (;;) {
PROPOSAL pro = { PREPARE_INDEX ,, };
ACCEPTSAL tmpAcpsal = { -,- };
boost::system::error_code error; boost::asio::write(socket, boost::asio::buffer(&pro,sizeof(pro)), error); size_t len = socket.read_some(boost::asio::buffer(&tmpAcpsal,sizeof(tmpAcpsal)), error); if (error == boost::asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
if (tmpAcpsal.epoch == pro.epoch) {
pro.index = ACCEPT_INDEX;
boost::asio::write(socket, boost::asio::buffer(&pro, sizeof(pro)), error);
size_t len = socket.read_some(boost::asio::buffer(&tmpAcpsal, sizeof(tmpAcpsal)), error);
}
if (tmpAcpsal.value != -) {
std::cout << " value is " << tmpAcpsal.value << std::endl;
system("pause");
return ;
}
else {
std::cerr << " value is " << tmpAcpsal.value << " . Error !!!" << std::endl;
system("pause");
return ;
} }
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
} system("pause"); return ;
}

propose

缺点 若抢占的propose 出现故障 则无法释放锁

方案2 单点接受多点提交 二段提交 根据epoch抢占提交权 若获取提交权的proposer出现故障 将会被拥有更高epoch的propose替代

// Accepter.cpp: 定义控制台应用程序的入口点。
// #include "stdafx.h"
#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <mutex>
#include <thread>
#include <boost/asio.hpp> using boost::asio::ip::tcp; const int default_port = ; #pragma pack (1)
//提议数据结构
typedef struct PROPOSAL
{
int index; //当前请求的阶段
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}PROPOSAL; typedef struct ACCEPTSAL
{
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}ACCEPTSAL;
#pragma pack() enum INDEX {
INIT_INDEX = ,
PREPARE_INDEX,
ACCEPT_INDEX,
FINISH_INDEX,
ERROR_INDEX = -,
}; int current_index = PREPARE_INDEX;
ACCEPTSAL g_ServerRecord = {-,- };
boost::asio::io_service io_service;
std::mutex g_mtx;
int acceptepoch = -;
//========================================================== void HandlePropose(std::shared_ptr<tcp::socket> socket) {
PROPOSAL buf = { -,-,- };
ACCEPTSAL tmpAcpsal = { -,- }; try {
while () {
boost::asio::read(*socket, boost::asio::buffer(&buf, sizeof(buf)));
{
std::lock_guard<std::mutex> lock(g_mtx);
std::cout << "recv " << "index = " << buf.index << ". epoch = " << buf.epoch << ". value = " << buf.value << std::endl;
} {
std::lock_guard<std::mutex> lock(g_mtx);
//prepare阶段
if (buf.index == PREPARE_INDEX) {
if (g_ServerRecord.epoch <= buf.epoch && g_ServerRecord.value == -) {
std::cout << "Prepare index" << std::endl;
std::cout << "Prepare recv " << "index = " << buf.index << ". epoch = " << buf.epoch << ". value = " << buf.value << std::endl;
//更新最新的prepare epoch
g_ServerRecord.epoch = buf.epoch;
current_index = ACCEPT_INDEX;
}
}
else if (buf.index == ACCEPT_INDEX && (- == g_ServerRecord.value)) {
if ((buf.epoch >= g_ServerRecord.epoch)) {
std::cout << "Accept index, epoch =" << buf.epoch << ".value = "<< buf.value << std::endl;
g_ServerRecord.epoch = buf.epoch;
g_ServerRecord.value = buf.value;
current_index = FINISH_INDEX;
}
}
//拷贝accepter记录
tmpAcpsal = g_ServerRecord;
}
//回复
{
std::lock_guard<std::mutex> lock(g_mtx);
std::cout << "reply epoch = " << tmpAcpsal.epoch << ". value = " << tmpAcpsal.value << std::endl;
}
boost::asio::write(*socket, boost::asio::buffer(&tmpAcpsal, sizeof(tmpAcpsal)));
}
}
catch (std::exception& e) {
//std::cerr << e.what() << std::endl;
return;
}
} int main()
{
try {
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), default_port));
for (;;) {
std::shared_ptr<tcp::socket> psocket = std::make_shared<tcp::socket>(io_service);
acceptor.accept(*psocket); std::thread t(HandlePropose,psocket);
t.detach();
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
} return ;
}

accepter

// Proposer.cpp: 定义控制台应用程序的入口点。
// #include "stdafx.h"
#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <mutex>
#include <random>
#include <thread>
#include <boost/asio.hpp> using boost::asio::ip::tcp; const std::string default_port = ""; #pragma pack (1)
//提议数据结构
typedef struct PROPOSAL
{
int index; //当前请求的阶段
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}PROPOSAL; typedef struct ACCEPTSAL
{
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}ACCEPTSAL;
#pragma pack() enum INDEX {
INIT_INDEX = ,
PREPARE_INDEX,
ACCEPT_INDEX,
FINISH_INDEX,
ERROR_INDEX = -,
};
//========================================================
boost::asio::io_service io_service; unsigned GetRand()
{
static std::default_random_engine e;
static std::uniform_int_distribution<unsigned> u(, );
return u(e);
} void ProposeThreadFunc(int id) {
try {
tcp::resolver resolver(io_service);
tcp::resolver::query query("127.0.0.1", default_port.c_str());
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); tcp::socket socket(io_service);
boost::asio::connect(socket, endpoint_iterator);
int epoch = id+;
for (;;) {
PROPOSAL pro = { PREPARE_INDEX ,epoch +,id + };
ACCEPTSAL tmpAcpsal = { -,- };
boost::system::error_code error; boost::asio::write(socket, boost::asio::buffer(&pro, sizeof(pro)), error); size_t len = socket.read_some(boost::asio::buffer(&tmpAcpsal, sizeof(tmpAcpsal)), error); if (error == boost::asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
if (tmpAcpsal.epoch == pro.epoch) {
pro.index = ACCEPT_INDEX; std::chrono::milliseconds dura(GetRand());
std::this_thread::sleep_for(dura); boost::asio::write(socket, boost::asio::buffer(&pro, sizeof(pro)), error);
size_t len = socket.read_some(boost::asio::buffer(&tmpAcpsal, sizeof(tmpAcpsal)), error);
}
if (tmpAcpsal.epoch > epoch) {
int i = tmpAcpsal.epoch%;
int loopcount = tmpAcpsal.epoch / ;
epoch = loopcount * + + id + ;
if(id == )
std::cout << "epoch = " << epoch << std::endl;
}
if (tmpAcpsal.value != -) {
std::cout << " value is " << tmpAcpsal.value << std::endl;
break ;
}
std::chrono::milliseconds dura(GetRand());
std::this_thread::sleep_for(dura);
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
} int main()
{
std::thread t[];
for (int i = ; i < ; i++) {
t[i] = std::thread(ProposeThreadFunc,i);
}
for (int i = ; i < ; i++) {
t[i].join();
} system("pause");
return ;
}

propose

运行代码 添加随机参数 最后提交接受的数值 实现随机化

运行第一次 最后接受数值为6

paxos 练手 推进中-LMLPHP

运行第二次 最后接受数值为2

paxos 练手 推进中-LMLPHP

accept扩展多点 匹配客户端待完成

// accepters.cpp: 定义控制台应用程序的入口点。
// #include "stdafx.h"
#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <mutex>
#include <thread>
#include <boost/asio.hpp> using boost::asio::ip::tcp; const int default_port = ; #pragma pack (1)
//提议数据结构
typedef struct PROPOSAL
{
int index; //当前请求的阶段
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}PROPOSAL; typedef struct ACCEPTSAL
{
int epoch; //流水号,1开始递增,保证全局唯一
int value; //提议内容
}ACCEPTSAL;
#pragma pack() enum INDEX {
INIT_INDEX = ,
PREPARE_INDEX,
ACCEPT_INDEX,
FINISH_INDEX,
ERROR_INDEX = -,
};
//=======================================================
//多个accepter记录
boost::asio::io_service io_service; int current_index[];// = PREPARE_INDEX;
ACCEPTSAL g_ServerRecord[];// = { -1,-1 };
std::mutex g_mtx[];
std::mutex g_print_mtx;
int acceptepoch[];// = -1; //=====================================================
void HandlePropose(std::shared_ptr<tcp::socket> socket,int id) {
PROPOSAL buf = { -,-,- };
ACCEPTSAL tmpAcpsal = { -,- }; try {
while () {
boost::asio::read(*socket, boost::asio::buffer(&buf, sizeof(buf)));
{
std::lock_guard<std::mutex> lock(g_mtx[id]);
std::lock_guard<std::mutex> printLock(g_print_mtx);
std::cout << "recv " << "index = " << buf.index << ". epoch = " << buf.epoch << ". value = " << buf.value << std::endl;
} {
std::lock_guard<std::mutex> lock(g_mtx[id]);
//prepare阶段
if (buf.index == PREPARE_INDEX) {
if (g_ServerRecord[id].epoch <= buf.epoch && g_ServerRecord[id].value == -) {
{
std::lock_guard<std::mutex> printLock(g_print_mtx);
std::cout << "Prepare index" << std::endl;
std::cout << "Prepare recv " << "index = " << buf.index << ". epoch = " << buf.epoch << ". value = " << buf.value << std::endl;
}
//更新最新的prepare epoch
g_ServerRecord[id].epoch = buf.epoch;
current_index[id] = ACCEPT_INDEX;
}
}
else if (buf.index == ACCEPT_INDEX && (- == g_ServerRecord[id].value)) {
if ((buf.epoch >= g_ServerRecord[id].epoch)) {
{
std::lock_guard<std::mutex> printLock(g_print_mtx);
std::cout << "Accept index, epoch =" << buf.epoch << ".value = " << buf.value << std::endl;
}
g_ServerRecord[id].epoch = buf.epoch;
g_ServerRecord[id].value = buf.value;
current_index[id] = FINISH_INDEX;
}
}
//拷贝accepter记录
tmpAcpsal = g_ServerRecord[id];
}
//回复
{
std::lock_guard<std::mutex> lock(g_mtx[id]);
{
std::lock_guard<std::mutex> printLock(g_print_mtx);
std::cout << "reply epoch = " << tmpAcpsal.epoch << ". value = " << tmpAcpsal.value << std::endl;
}
}
boost::asio::write(*socket, boost::asio::buffer(&tmpAcpsal, sizeof(tmpAcpsal)));
}
}
catch (std::exception& e) {
//std::cerr << e.what() << std::endl;
return;
}
} void AcceptThreadFunc(int id) {
try {
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), default_port+id));
for (;;) {
std::shared_ptr<tcp::socket> psocket = std::make_shared<tcp::socket>(io_service);
acceptor.accept(*psocket); std::thread t(HandlePropose, psocket,id);
t.detach();
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
} void init() {
for (int i = ; i < ; i++) {
current_index[i] = PREPARE_INDEX;
g_ServerRecord[i].epoch = -;
g_ServerRecord[i].value = -;
}
} int main()
{
init();
std::thread t[];
for (int i = ; i < ; i++) {
t[i] = std::thread(AcceptThreadFunc,i);
}
for (int i = ; i < ; i++) {
t[i].join();
} return ;
}
04-25 22:58