项目地址在这里: https://gitee.com/penggli_2_0/TcpServer
1 项目介绍
这是一个仿muduo库One Thread One Loop式主从Reactor模型实现⾼并发服务器项目。是对基于事件驱动Reactor模型的改良版!
通过实现的高并发服务器组件,可以简洁快速的完成⼀个⾼性能的服务器搭建。并且,通过组件内提供的不同应⽤层协议⽀持,也可以快速完成⼀个高性能应用服务器的搭建(当前为了便于项目的演⽰,项目中提供HTTP协议组件的支持)。在这里,要明确的是咱们要实现的是⼀个高并发服务器组件,因此当前的项目中并不包含实际的业务内容!
实现的是主从Reactor模型服务器!分为两个部分:主Reactor 与 子Reactor:
- 主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的高效,提⾼服务器的并发性能。主Reactor获取到新连接后分发给⼦Reactor进⾏通信事件监控。
- 子Reactor线程监控各⾃的描述符的读写事件进行数据读写以及业务处理!
One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进行,⼀个线程对应⼀个事件处理的循环。
当前实现中,因为并不确定组件使用者的意向,因此不提供业务层工作线程池的实现,只实现主从Reactor,而Worker工作线程池,可由组件库的使用者的需要自行决定是否使用和实现。
2 模块组成
在这项目中我们需要两大模块:
- 协议模块:协议是双方进行通信的基础,这是一定要实现的!
- sever服务模块:实现Reactor模型的TCP服务器,实现高效派发任务!
SERVER模块就是对所有的连接以及线程进⾏管理,让它们各司其职,在合适的时候做合适的事,最终完成高性能服务器组件的实现。具体的管理也分为三个方面:
- 监听连接管理:对监听连接进⾏管理。
- 通信连接管理:对通信连接进⾏管理。
- 超时连接管理:对超时连接进⾏管理。
基于以上的管理思想,将这个模块进⾏细致的划分⼜可以划分为以下多个子模块:
-
Buffer模块:
Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能。先前的文章中我们使用string模拟过缓冲区,这里需要进行丰富功能! -
Socket模块:
Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。 -
Channel模块:
Channel模块是对⼀个描述符需要进⾏的IO事件管理的模块,实现对描述符可读,可写,错误等事件的管理操作,以及Poller模块对描述符进⾏IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。 -
Connection模块:
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对⼀个通信套接字的整体的管理,每⼀个进⾏数据通信的套接字(也就是accept获取到的新连接)都会使用Connection进⾏管理。 -
Acceptor模块:
Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管理。 -
TimerQueue模块:
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管理器中添加⼀个任务,任务将在固定时间后被执行,同时也可以通过刷新定时任务来延迟任务的执行。这个模块主要是对Connection对象的⽣命周期管理,对非活跃连接进⾏超时后的释放功能 -
Poller模块:
Poller模块是对多路转接epoll进⾏封装的⼀个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。 -
EventLoop模块:
EventLoop模块可以理解就是我们上边所说的Reactor模块,进行事件派发。它是对Poller模块,TimerQueue模块,Socket模块的⼀个整体封装,进⾏所有描述符的事件监控。EventLoop模块必然是⼀个对象对应⼀个线程的模块,线程内部的目的就是运行EventLoop的启动函数。 -
TcpServer模块:
这个模块是⼀个整体Tcp服务器模块的封装,内部封装了Acceptor模块,EventLoopThreadPool模块。
3 实现时间轮模块
3.1 设计思想
时间轮思想:时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针⾛到3的时候,就代表时间到了。
同样的道理,如果我们定义了⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中⾛⼀步,三秒钟后tick⾛到对应位置,这时候执⾏对应位置的任务即可。
3.2 定时任务类
这里先对时间轮模块进行一个简单实现:
首先需要设计一个TimeTask 定时任务类
- 成员变量:
- 任务ID uint64_t _id :用来标识任务
- 超时时间 uint32_t _timeout
- 回调任务 _task_cb void()类型
- 释放操作 _release :用于删除TimeWheel保存的定时器对象信息
- 成员函数
- 构造函数
- 析构函数 :析构的时候执行定时任务,这样就可以通过释放空间实现定时实现!
- SetRelease函数设置 _release 任务
- DelayTime函数 返回延迟时间
class Timer
{
private:
uint64_t _id; // 任务Id
uint32_t _timeout; // 延迟时间
bool _canceled; // false表示没有被取消了 true表示被取消了
Task_t _task_cb; // 定时任务
Release_t _release; // 释放操作
public:
Timer(uint64_t id, uint32_t timeout, Task_t task) : _id(id),
_timeout(timeout),
_canceled(false),
_task_cb(task)
{
}
void Cancel()
{
_canceled = true;
}
~Timer()
{
if (_canceled == false)
_task_cb();
_release();
}
// 设置释放操作
void SetRelease(Release_t release)
{
_release = release;
}
// 返回延迟时间
uint32_t DelayTime()
{
return _timeout;
}
};
3.3 TimeWheel时间轮类
- 成员变量:
- 二维数组时间轮,每一节点储存PtrTask数组
- 当前时间指针 int _tick; 走到哪里 执行哪里的任务
- 表盘最大数量(默认60秒)
- 定时器任务ID映射表 unordered_map<uint64_t , WeakPtr> _timers
- 成员函数
- 构造函数
- 析构函数
- 哈希表操作函数 RemoveTimer 删除对应ID的对象
- TimerAdd函数添加定时任务 :创建一个TimeTask对象指针 ,设置RemoveTimer给_release ,将对象指针设置进哈希表中 注意使用WeakPtr!将任务添加到时间轮中
- TimerRefresh 刷新延迟定时任务:通过保存的定时器对象的Weak_ptr构造一个TaskPtr ,添加到时间轮中
- RunTimerTask 运行任务时间轮,向后移动一个位置,释放该位置的资源 会自动执行析构函数 运行任务
class TimeWheel
{
using TaskPtr = std::shared_ptr<Timer>;
using WeakPtr = std::weak_ptr<Timer>; // 辅助shared_ptr 不会增加引用计数
private:
int _capacity; // 最大容量 表盘最大数量(默认60秒)
int _tick; // 移动表针
std::vector<std::vector<TaskPtr>> _wheel; // 时间轮
std::unordered_map<uint64_t, WeakPtr> _timers; // 定时任务对象哈希表
private:
void RemoveTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it != _timers.end())
{
_timers.erase(it);
}
}
public:
TimeWheel() : _capacity(gnum), _tick(0), _wheel(_capacity, std::vector<TaskPtr>())
{
}
void TimerAdd(uint64_t id, int delay, Task_t cb)
{
TaskPtr p(new Timer(id, delay, cb));
p->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id));
_timers[id] = WeakPtr(p); // 进行映射 注意是WeakPtr
// 放入时间轮
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(p);
}
void TimerRefresh(uint64_t id)
{
// 更新任务
auto it = _timers.find(id);
if (it == _timers.end())
return;
// 通过WeakPtr构造一个shared_ptr
TaskPtr p = it->second.lock();
int delay = p->DelayTime();
// 放入时间轮
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(p);
}
void RunTimerTask()
{
_tick = (_tick + 1) % _capacity;
_wheel[_tick].clear();
}
void TimerCancel(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
return;//没有找到直接退出!
TaskPtr p = it->second.lock();
if (p)
p->Cancel();
}
~TimeWheel()
{
}
};