reactor
reactor是是服务器的重要模型, 是一种事件驱动的反应堆模式
通过epoll_create() 创建句柄, epoll_ctrl()提前注册好不同的事件处理函数 , 当事件到来就由 epoll_wait () 获取同时到来的多个事件,并且根据数据的不同类型将事件分发给事件处理机制 (事件处理器),通过回调函数方式实现响应的功能(如创建客户端fd, 读/写IO)
优点:
- 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的;
- 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/ 进程的切换开销
- 可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源
- 可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性
流程:
- 注册事件 和 对应的事件处理器
- 多路复用器等待事件到来
- 事件到来,激发事件分发器分发事件到对应的处理器
- 事件处理器处理事件,然后注册新的事件 (如fu武器接收buffer 后 发送buffer)
api
- epol_create: 创建fd
int epoll_create(int size);
创建一个epoll的句柄,size通常为1,当创建好epoll句柄后,它就是会占用一个fd
- epoll_ctrl: epoll的事件注册函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll_ctl向 epoll对象中添加、修改或者删除感兴趣的事件,返回0表示成功,否则返回–1,此时需要根据errno错误码判断错误类型
epfd : epoll_create()的返回值
op : 表示动作
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
fd : 需要监听的fd
*event : 告诉内核需要监听什么事
struct epoll_event结构:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
- epoll_wait: 等待事件产生,类似与select调用
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epfd: epoll的描述符
*event : events则是分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中
maxevnets : 表示本次可以返回的最大事件数目,通常 maxevents参数与预分配的events数组的大小是相等的。
timeout : 表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待
code
reactor 封装
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <fcntl.h>
typedef struct sockaddr SA;
#define BUFFSIZE 1024
struct sockitem {
int sockfd;
//事件处理器,处理函数回调接口
int (*callback)(int fd, int events, void* arg);
//读写函数
char recvbuffer[BUFFSIZE];
char sendbuffer[BUFFSIZE];
//读写字节数
int rlen;
int slen;
};
struct reactor {
int epfd;
struct epoll_event events[512];
};
//定义全局的eventloop --> 事件循环
struct reactor* eventloop = NULL;
//申明这些事件处理器函数
int recv_cb(int fd, int events, void *arg);
int accept_cb(int fd, int events, void* arg);
int send_cb(int fd, int evnts, void* arg);
int recv_cb(int fd, int events, void *arg) {
struct sockitem* si = (struct sockitem*)arg;
struct epoll_event ev;//后面需要
//处理IO读事件
int ret = recv(fd, si->recvbuffer, BUFFSIZE, 0);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { //
return -1;
} else {
}
//出错了,从监视IO事件红黑树中移除结点,避免僵尸结点
ev.events = EPOLLIN;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(si);
} else if (ret == 0) {
//对端断开连接
printf("fd %d disconnect\n", fd);
ev.events = EPOLLIN;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
//close同一断开连接,避免客户端大量的close_wait状态
close(fd);
free(si);
} else {
//打印接收到的数据
printf("recv: %s, %d Bytes\n", si->recvbuffer, ret);
//设置sendbuffer
si->rlen = ret;
memcpy(si->sendbuffer, si->recvbuffer, si->rlen);
si->slen = si->rlen;
//注册写事件处理器
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
si->sockfd = fd;
si->callback = send_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int accept_cb(int fd, int events, void* arg) {
//处理新的连接。 连接IO事件处理流程
struct sockaddr_in cli_addr;
memset(&cli_addr, 0, sizeof(cli_addr));
socklen_t cli_len = sizeof(cli_addr);
int cli_fd = accept(fd, (SA*)&cli_addr, &cli_len);
if (cli_fd <= 0) return -1;
char cli_ip[INET_ADDRSTRLEN] = {0}; //存储cli_ip
printf("Recv from ip %s at port %d\n", inet_ntop(AF_INET, &cli_addr.sin_addr, cli_ip, sizeof(cli_ip)),
ntohs(cli_addr.sin_port));
//注册接下来的读事件处理器
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = cli_fd;
si->callback = recv_cb;//设置事件处理器
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, cli_fd, &ev);
return cli_fd;
}
int send_cb(int fd, int events, void* arg) {
//处理send IO事件
struct sockitem *si = (struct sockitem*)arg;
send(fd, si->sendbuffer, si->slen, 0);
//再次注册IO读事件处理器
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
si->sockfd = fd;
si->callback = recv_cb;//设置事件处理器
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
int main(int argc, char* argv[]) {
if (argc != 2) {
fprintf(stderr, "uasge: %s <port>", argv[0]);
return 1;
}
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;
int port = atoi(argv[1]);
//确定服务端协议地址簇
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(port);
//进行绑定
if (-1 == bind(sockfd, (SA*)&serv_addr, sizeof(serv_addr))) {
fprintf(stderr, "bind error");
return 2;
}
if (-1 == listen(sockfd, 5)) {
fprintf(stderr, "listen error");
return 3;
}
//init eventloop
eventloop = (struct reactor*)malloc(sizeof(struct reactor));
//创建epoll句柄.
eventloop->epfd = epoll_create(1);
//注册建立连接IO事件处理函数
struct epoll_event ev;
ev.events = EPOLLIN;
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->callback = accept_cb;//设置事件处理器
ev.data.ptr = si;
//将监视事件加入到reactor的epfd中
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev);
while (1) {
//多路复用器监视多个IO事件
int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
if (nready < -1) {
break;
}
int i = 0;
//循环分发所有的IO事件给处理器
for (i = 0; i < nready; ++i) {
if (eventloop->events[i].events & EPOLLIN) {
struct sockitem* si = (struct sockitem*)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
if (eventloop->events[i].events & EPOLLOUT) {
struct sockitem* si = (struct sockitem*)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
}
}
return 0;
}
原文链接:https://blog.csdn.net/weixin_53695360/article/details/123894158