参考文献:

  1. 深入剖析 redis 事件驱动
  2. Redis 中的事件循环
  3. 深入了解epoll (转)
  4. Redis自己的事件模型 ae
  5. EPOLL(7)
  6. Linux IO模式及 select、poll、epoll详解
  7. epoll为什么这么快,epoll的实现原理

概述

在redis中,对于对于文件事件的处理采用了Reactor模型。总体来说,就是将io多路复用所监听到的文件去处,并放入一个队列中依次处理。接下去本文以一个io多路复用的例子开始,一步步还原redis文件事件的运行过程

epoll (本节从Linux IO模式及 select、poll、epoll详解摘抄)

epoll使用的过程中需要如下的三个接口:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

int epoll_create(int size)

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。

当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

函数是对指定描述符fd执行op操作。

  • epfd:是epoll_create()的返回值。
  • op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
  • fd:是需要监听的fd(文件描述符)
  • epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
}; //events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待epfd上的io事件,最多返回maxevents个事件。

参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

一个epoll的示例

有了上面的论述,用一个简单的例子来说明下epoll的使用(来自http://man7.org/linux/man-pages/man7/epoll.7.html):

#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd; /* Code to set up listening socket, 'listen_sock',
(socket(), bind(), listen()) omitted */ epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
} ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
} for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
} for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &addr, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}

redis 文件事件模型-LMLPHP

如图所示,可以看出使用epoll的过程。接下来将介绍redis事件驱动模型中主要涉及的数据结构。

redis事件驱动模型

数据结构

redis事件驱动模型中主要涉及到如下的几个数据结构:

  1. aeCreateEventLoop.
  2. aeApiState.
  3. aeFileEvent.
  4. aeFiredEvent.

    redis事件处理的核心是aeCreateEventLoop结构,如图可以看出主要的结构体如下:

    redis 文件事件模型-LMLPHP
typedef struct aeEventLoop {

    // 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */ // 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */ // 用于生成时间事件 id
long long timeEventNextId; // 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */ // 已注册的文件事件
aeFileEvent *events; /* Registered events */ // 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */ // 时间事件
aeTimeEvent *timeEventHead; // 事件处理器的开关
int stop; // 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */ // 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep; } aeEventLoop;

其中aeFileEvent 结构体为已经注册并需要监听的事件的结构体。在redis初始化的时候会创建一个 setSizesizeof(aeFileEvent) 以及一个 setSizesiezeof(aeFiredEvent) 大小的内存,用文件描述符作为其索引。那么这个大小定位多少合适呢?在Linux个中,文件描述符是个有限的资源,当打开一个文件时就会消耗一个文件描述符,当关闭该文件描述符或者程序结束时会释放该文件描述符资源,从而供其他文件打开操作使用。当文件描述符超过最大值后,打开文件就会出错。那么这个最大值是多少呢?可以通过/proc/sys/fs/file-max看到系统支持的最大的文件描述符数。通过 ulimit -n 可以看到当前用户能打开的最大的文件描述符。在我这里的一台8g内存的机器上,系统支持最大的文件描述是365146。而在这台64bit的机器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 大小为40byte。按系统最大支持的文件描述符来算,固定消耗内存为14.6M。这样以文件描述符作为数组的下标来索引,虽然这样的哈希在接入量不大的情况下会有大量的浪费。但是最多也就浪费14M 的内存,因此这样的设计是可取的。【4】

typedef struct aeFileEvent {

    // 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */ // 读事件处理器
aeFileProc *rfileProc; // 写事件处理器
aeFileProc *wfileProc; // 多路复用库的私有数据
void *clientData; } aeFileEvent;

aeFiredEvent结构体是已经监听到有事件发生的描述符的集合。

typedef struct aeFiredEvent {

    // 已就绪文件描述符
int fd; // 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask; } aeFiredEvent;

void *apidata;在ae创建的时候,会被赋值为aeApiState结构体,结构体的定义如下:

typedef struct aeApiState {

    // epoll_event 实例描述符
int epfd; // 事件槽
struct epoll_event *events; } aeApiState;

可以见得,这个结构体是为了epoll所准备的数据结构。redis可以选择不同的io多路复用方法。因此 apidata 是个void类型,根据不同的io多路复用库来选择。

Reactor模型的创建与使用

aeEventLoop 的创建

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i; // 创建事件状态结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; // 初始化文件事件结构和已就绪文件事件结构数组
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
// 设置数组大小
eventLoop->setsize = setsize;
// 初始化执行最近一次执行时间
eventLoop->lastTime = time(NULL); // 初始化时间事件结构
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0; eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化监听事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE; // 返回事件循环
return eventLoop; err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

如下图所示,可以见得在初始化的时候创建结构体的流程。

graph LR
创建aeFileEvent-->创建aeFireEvent
创建aeFireEvent-->调用aeApiCreate创建aeApiState

函数aeApiCreate则创建了一个epoll所需要的数据结构。

/*
* 创建一个新的 epoll 实例,并将它赋值给 eventLoop
*/
static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; // 初始化事件槽空间
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
} // 创建 epoll 实例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
} // 赋值给 eventLoop
eventLoop->apidata = state;
return 0;
}

aeFileEvent的注册

在创建了aeEventLoop之后,对于需要监听的文件描述符需要进行注册,在aeFileEvent结构体中,可以看到如下的两个结构aeFileProc *rfileProc和aeFileProc *wfileProc,就是在注册监听事件的时候进行赋值的。

函数aeCreateFileEvent执行创建aeFileEvent和添加文件句柄到epoll中。

/*
* 根据 mask 参数的值,监听 fd 文件的状态,
* 当 fd 可用时,执行 proc 函数
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
} if (fd >= eventLoop->setsize) return AE_ERR; // 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd]; // 监听指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR; // 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc; // 私有数据
fe->clientData = clientData; // 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd; return AE_OK;
}

其中aeApiAddEvent函数就是在开头之中epoll例子中添加一个文件描述符到监听集合中的方法封装函数:

/*
* 关联给定事件到 fd
*/
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee; /* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation.
*
* 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。
*
* 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。
*/
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD; // 注册事件到 epoll
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0;
}

事件驱动模型的运行过程

到这里redis事件驱动的主要数据结构和初始化的方法已经介绍完毕。接下来将展示事件驱动的运行过程。在redis源码中,省略去其他部分,跟事件驱动相关的代码如下:

    server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
} // 为本地套接字关联应答处理器
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
aeMain(server.el);

其中aeCreateEventLoop和aeCreateFileEvent函数在之前已经介绍过。接下来重点介绍下aeMain函数:


void aeMain(aeEventLoop * eventLoop) { eventLoop->stop = 0;
while (!eventLoop->stop){
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

我们可以看出,aeMain函数中主要调用了aeProcessEvents处理事件,aeProcessEvents中我们略去其他的代码,主要关注如下的部分:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
....
// 处理文件事件,阻塞时间由 tvp 决定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 读事件
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
} processed++;
}
....
}

可以看出函数aeProcessEvents调用了aeApiPoll获取已经就绪的事件。在for循环中,从eventLoop->fired(已经就绪的事件)中取出事件结构体,然后根据是读时间还是写事件进行处理。在aeApiPoll中,就可以看到我们熟悉的

epoll_wait的身影。可以见得通过调用系统的epoll_wait函数,然后将已经就绪的事件放入 eventLoop->fired中。

/*
* 获取可执行事件
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0; // 等待时间
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // 有至少一个事件就绪?
if (retval > 0) {
int j; // 为已就绪事件设置相应的模式
// 并加入到 eventLoop 的 fired 数组中
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
} // 返回已就绪事件个数
return numevents;
}

到这里还有一个疑问,在redis初始化的时候只注册了tcp和本地套接字的描述符,那么当有个新的客户端连接进来的时候,是怎么将客户端的描述符加到监听队列里面的呢?答案就在最开始的acceptTcpHandler函数中。在这个函数中依次调用了acceptCommonHandler->createClient->aeCreateFileEvent函数。可以见得当监听的一个tcp或者本地socket产生了connect 事件的时候,就会依次调用这些函数,然后将新的客户端端描述符加入监听中。

总结

redis的事件驱动模型分析就到这里,总体而言还是比较直观的。这中间也学习了很多,包括epoll的原理等。

05-17 02:28