实现Reactor反应堆模型
1 重新认识Reactor
Reactor是反应堆模型,那么什么叫反应堆呢?反应堆可以理解为对应事件的管理容器!
反应堆中会有一个一个的连接Connection对象(对文件描述符fd的封装),通过Reactor内部的EPOLL模型,获取到事件可以激活对应文件描述符,执行事件!执行事件会分为listenfd与普通套接字,都有对应的上层模块!
简单来说,反应堆模型可以理解为打地鼠:
这里的洞代表一个一个的Connection
,整个土地是Reactor
模型,地鼠就是事件。地鼠处理,就要使用锤子处理事件!
反应堆模型是基于事驱动的网络服务器设计的主流模式。
Reactor模型的核心思想是将多个并发请求通过同步事件多路分解和分发到相应的请求处理线程(或进程),从而实现高吞吐量和低延迟的服务。
这样的反应堆模型有mudou库,libevent库供我们使用。
2 普通线程池
线程:在进程内部运行,是CPU调度的基本单位。同一个进程中的线程共享地址空间,但也有独立的栈空间。
之前我们实现过线程池:其中有一个任务队列,用于分配任务给线程执行。那么如何将Reactor与线程池耦合起来?我们可以把报文解析方法交给线程池来执行,线程池内部会调用线程执行这个任务!也就是把业务处理的控制权交给线程池!
线程池处理时,如果没发完依然会调用EPOLL将写事件写入到内核中进行托管!下一次再次调用进行发送!
void Recver(Connection *conn)
{
// LOG(DEBUG , "client发送信息: %d\n" , conn->Sockfd());
// 进行正常读写 --- 非阻塞读取
while (true)
{
char buffer[buffersize];
ssize_t n = ::recv(conn->Sockfd(), buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
// buffer是一个数据块 添加到conn的输入缓冲区中
buffer[n] = 0;
conn->AppendInbuffer(buffer);
// 数据交给上层处理
}
else if (n == 0)
{
// 连接断开
LOG(INFO, "客户端[%s]退出, 服务器准备关闭fd: %d\n", conn->GerInetAddr().AddrStr().c_str(), conn->Sockfd());
conn->_handler_excepter(conn); // 统一执行异常处理
return;
}
else
{
// 本轮数据读完了
if (errno == EWOULDBLOCK)
{
// 这是唯一出口
break;
}
// 信号中断
else if (errno == EINTR)
{
continue;
}
// 出现异常
else
{
conn->_handler_excepter(conn);
return;
}
}
}
// 读取完毕,我们应该处理数据了!
// 加入协议
// std::cout << "Inbuffer 内容:" << conn->Inbuffer() << std::endl;
//_process(conn);
// 加入线程池 --- 进行绑定
PackageParse parse;
task_t func = std::bind(&PackageParse::Execute, &parse, conn);
_tp->Equeue(func);
}
//....
// 线程池
ThreadPool<task_t> *_tp = ThreadPool<task_t>::GetInstance();
我们可以来看一下效果:
这样业务处理的逻辑就交给了线程池来进行处理!
但是这里面有两点是很不舒服的:
- 如果线程池现在正在处理fd的读事件,而此时此刻该fd的读事件在托管中就绪了,那么又会启动一个线程处理同一个fd的读事件!这样就冲突了!线程不安全!
- 我们的处理中Reactor是可以进行数据发送的,线程池也可以进行数据发送,这样会有可能进行冲突!线程不安全!
所以首先对于Connection内部的输入输出缓冲区是要加锁来进行保护的!然后重要的是,让线程池不能发送数据,线程池想要发送数据必须也通过EPOLL来进行发送!
线程池的回调函数设置为这样:
void Execute(Connection *conn)
{
LOG(INFO, "service start!!!\n");
while (true)
{
// 1.报文解析
std::string str = Decode(conn->Inbuffer()); // 通过去报头获取报文
// std::cout << "str: " << str << std::endl;
// 连接当前没有完整的报文! --- 直接退出
if (str.empty())
break;
// 到这里说明有完整的报文!!!
auto req = Factory::BuildRequestDefault();
// 2.反序列化 得到 Request
req->Deserialize(str);
// auto res = Factory::BuildResponseDefault();
// 3.业务处理
auto res = cal.Calculator(req);
// 4.进行序列化处理
std::string ret;
res->Serialize(&ret);
std::cout << "ret: " << ret << std::endl;
// 5.加入报头
std::string package = Encode(ret);
// std::cout << "package: \n"<< package << std::endl;
// 6.发回数据
// 直接进行发送 , 怎么知道写入条件不满足呢? 通过错误码errno是EAGAIN即可。
conn->AppendOutbuffer(package);
}
// 到了这里 说明至少处理了一个请求 只是有一个应答
// 进行发回数据
// 方法1:直接发回数据
// if (!conn->Outbuffer().empty())
// conn->_handler_sender(conn);
// 方法2:将写事件设置进入EPOLL就可以了 会自动进行读取
// 我不负责发送!
if (!conn->Outbuffer().empty())
conn->GetReactor()->EnableConnectionReadWrite(conn->Sockfd(), true, true);
}
这样激活一下对fd的写事件关心,从此以后所有的IO全部都由Reactor自动处理,线程池只需要复杂线程安全的处理请求和应答即可!
这种模式叫做半同步半异步模式:
- Reactor处理检测事件就绪和IO处理,这里是同步进行的!
- 线程池处理业务处理,是并发异步处理的!
共用7个部分处理:
- 接受请求
- 解析报文
- 反序列化
- 业务处理
- 序列化
- 构建应答
- 发送应答
上面的方案中,Reactor负责的是接受请求工作,其余交给线程池处理,这样就会有同一个fd被多个线程使用的情况!
3 OTOL方案
那么还可以怎么做呢?
我们让Reactor一个人直接做所有的事情!这样肯定不会处理一个fd被多线程处理的情况,但是要怎么做才能保证多线程的效率还可以保证安全呢?
- 我们使用线程池,每一个线程中都有一个Reactor执行所有的任务!这样就会有多个Reactor多执行流处理任务!
- 然后通过一个中心Reactor调控这些普通Reactor!不负责IO和业务处理,只负责连接事件和新的Sockfd的派发工作!
这种方案叫做One Thread One Loop
!可以通过多进程和多线程实现
3.1 多进程版
- 主进程将listensockfd添加到自己的Reactor中!只有他可以获取新连接,一旦有新连接到来,父进程不负责读取!而是通过管道告诉子进程底层就绪了,可以获取连接。
- 管道读端
rfd
也是文件描述符,子进程中的Reactor对rfd
进行等待. - 子进程可以通过对该
rfd
的关心知道此时可以获取连接,然后进行读取获取新连接了!在子进程中负责这个Sockfd的生命周期!
3.2 多线程版
- 主线程将listensockfd添加到自己的Reactor中!只有他可以获取新连接,一旦有新连接到来,主线程将文件描述符暂时存储到一个容器中。
- 当主线程获取到新连接,此时会有一个容器储存着文件描述符,此时通过命名管道唤醒新线程将文件描述符读取走!
- 这里保证线程安全不能使用条件变量!因为新线程不仅要等待连接,还需要处理业务!所以只能子啊epoll等待,其他场景不能等待!