http://blog.csdn.net/liangyuannao/article/details/7776057
先说Select:
1.Socket数量限制:该模式可操作的Socket数由FD_SETSIZE决定,内核默认32*32=1024.
2.操作限制:通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍.后说Poll:
1.Socket数量几乎无限制:该模式下的Socket对应的fd列表由一个数组来保存,大小不限(默认4k).
2.操作限制:同Select.
再说:Epoll:
1.Socket数量无限制:同Poll
2.操作无限制:基于内核提供的反射模式,有活跃Socket时,内核访问该Socket的callback,不需要遍历轮询.
总体来说:
大部分情况下,反射的效率都比遍历来的高,但是!
但是当所有Socket都活跃的时候,反射还会更高么?这时候所有的callback都被唤醒,会导致资源的竞争.既然都是要处理所有的Socket,那么遍历是最简单最有效的实现方式.
举例来说:
对于IM服务器,服务器和服务器之间都是长链接,但数量不多,一般一台60\70个,比如采用ICE这种架构设计,但请求相当频繁和密集,这时候通过反射唤醒callback不一定比用select去遍历处理更好.
对于web portal服务器,都是浏览器客户端发起的http短链接请求,数量很大,好一点的网站动辄每分钟上千个请求过来,同时服务器端还有更多的闲置等待超时的Socket,这时候没必要把全部的Socket都遍历处理,因为那些等待超时的请求是大多数的,这样用Epoll会更好.
1.Socket数量几乎无限制:该模式下的Socket对应的fd列表由一个数组来保存,大小不限(默认4k).
2.操作限制:同Select.
1.Socket数量无限制:同Poll
2.操作无限制:基于内核提供的反射模式,有活跃Socket时,内核访问该Socket的callback,不需要遍历轮询.
大部分情况下,反射的效率都比遍历来的高,但是!
但是当所有Socket都活跃的时候,反射还会更高么?这时候所有的callback都被唤醒,会导致资源的竞争.既然都是要处理所有的Socket,那么遍历是最简单最有效的实现方式.
对于IM服务器,服务器和服务器之间都是长链接,但数量不多,一般一台60\70个,比如采用ICE这种架构设计,但请求相当频繁和密集,这时候通过反射唤醒callback不一定比用select去遍历处理更好.
对于web portal服务器,都是浏览器客户端发起的http短链接请求,数量很大,好一点的网站动辄每分钟上千个请求过来,同时服务器端还有更多的闲置等待超时的Socket,这时候没必要把全部的Socket都遍历处理,因为那些等待超时的请求是大多数的,这样用Epoll会更好.
**************************************************************************************************************************************************************************************
epoll和select/poll的区别
1. 支持一个进程打开大数目的socket描述符(FD)
select最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降;二是可以选择多进程的解决方案(传统的Apache方案),不过虽然Linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过epoll则没有这个限制,它锁支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
2. IO效率不随FD数目增加而线性下降
传统的select/poll另外一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分socket是“活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈线性下降。但是epoll不存在这个问题,它只会对“活跃”的socket进行操作——这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核。在一些benchmark中,如果所有的socket基本上都是活跃的——比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
3. 使用mmap加速内核与用户空间的消息传递
这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你像我一样从2.5内核就关注epoll的话,一定不会忘记手工mmap这一步的。
4. 内核微调
这一点其实不算epoll的优点了,而是整个Linux平台的优点。也许你可以怀疑Linux平台,但是你无法回避Linux平台赋予你微调内核的能力。比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool(skb_head_pool)的大小——通过echo XXXX> /proc/sys/net/core/hot_list_length完成。再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。
Using epoll() For Asynchronous Network Programming
http://kovyrin.net/2006/04/13/epoll-asynchronous-network-programming/
General way to implement tcp servers is “one thread/process per connection”. But on high loads this approach can be not so efficient and we need to use another patterns of connection handling. In this article I will describe how to implement tcp-server with synchronous connections handling using epoll() system call of Linux 2.6. kernel.
epoll is a new system call introduced in Linux 2.6. It is designed to replace the deprecated select (and also poll). Unlike these earlier system calls, which are O(n), epoll is an O(1) algorithm – this means that it scales well as the number of watched file descriptors increase. select uses a linear search through the list of watched file descriptors, which causes its O(n) behaviour, whereas epoll uses callbacks in the kernel file structure.
Another fundamental difference of epoll is that it can be used in an edge-triggered, as opposed to level-triggered, fashion. This means that you receive “hints” when the kernel believes the file descriptor has become ready for I/O, as opposed to being told “I/O can be carried out on this file descriptor”. This has a couple of minor advantages: kernel space doesn’t need to keep track of the state of the file descriptor, although it might just push that problem into user space, and user space programs can be more flexible (e.g. the readiness change notification can just be ignored).
To use epoll method you need to make following steps in your application:
- Create specific file descriptor for epoll calls:
epfd = epoll_create(EPOLL_QUEUE_LEN);
where EPOLL_QUEUE_LEN is the maximum number of connection descriptors you expect to manage at one time. The return value is a file descriptor that will be used in epoll calls later. This descriptor can be closed with close() when you do not longer need it.
- After first step you can add your descriptors to epoll with following call:
static struct epoll_event ev; int client_sock; ... ev.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP; ev.data.fd = client_sock; int res = epoll_ctl(epfd, EPOLL_CTL_ADD, client_sock, &ev);
where ev is epoll event configuration sctucture, EPOLL_CTL_ADD – predefined command constant to add sockets to epoll. Detailed description of epoll_ctl flags can be found in epoll_ctl(2)man page. When client_sock descriptor will be closed, it will be automatically deleted from epoll descriptor.
- When all your descriptors will be added to epoll, your process can idle and wait to something to do with epoll’ed sockets:
while (1) { // wait for something to do... int nfds = epoll_wait(epfd, events, MAX_EPOLL_EVENTS_PER_RUN, EPOLL_RUN_TIMEOUT); if (nfds < 0) die("Error in epoll_wait!"); // for each ready socket for(int i = 0; i < nfds; i++) { int fd = events[i].data.fd; handle_io_on_socket(fd); } }
Typical architecture of your application (networking part) is described below. This architecture allow almost unlimited scalability of your application on single and multi-processor systems:
- Listener – thread that performs bind() and listen() calls and waits for incoming conncetions. Then new connection arrives, this thread can do accept() on listening socket an send accepted connection socket to one of the I/O-workers.
- I/O-Worker(s) – one or more threads to receive connections from listener and to add them to epoll. Main loop of the generic I/O-worker looks like last step of epoll using pattern described above.
- Data Processing Worker(s) – one or more threads to receive data from and send data to I/O-workers and to perform data processing.
As you can see, epoll() API is very simple but believe me, it is very powerful. Linear scalability allows you to manage huge amounts of parallel connections with small amout of worker processes comparing to classical one-thread per connection.
If you want to read more about epoll or you want to look at some benchmarks, you can visit epoll Scalability Web Page at Sourceforge. Another interesting resources are:
- The C10K problem: a most known page about handling many connections and various I/O paradigms including epoll().
- libevent: high-level event-handling library ontop of the epoll. This page contains some information about performance tests of epoll.
(一)select()函数
原型如下:
各个参数含义如下:
- int fdsp1:最大描述符值 + 1
- fd_set *readfds:对可读感兴趣的描述符集
- fd_set *writefds:对可写感兴趣的描述符集
- fd_set *errorfds:对出错感兴趣的描述符集
- struct timeval *timeout:超时时间(注意:对于linux系统,此参数没有const限制,每次select调用完毕timeout的值都被修改为剩余时间,而unix系统则不会改变timeout值)
select函数会在发生以下情况时返回:
- readfds集合中有描述符可读
- writefds集合中有描述符可写
- errorfds集合中有描述符遇到错误条件
- 指定的超时时间timeout到了
当select返回时,描述符集合将被修改以指示哪些个描述符正处于可读、可写或有错误状态。可以用FD_ISSET宏对描述符进行测试以找到状态变化的描述符。如果select因为超时而返回的话,所有的描述符集合都将被清空。
select函数返回状态发生变化的描述符总数。返回0意味着超时。失败则返回-1并设置errno。可能出现的错误有:EBADF(无效描述符)、EINTR(因终端而返回)、EINVAL(nfds或timeout取值错误)。
设置描述符集合通常用如下几个宏定义:
2 FD_SET(int fd, fd_set *fdset); /* turn on the bit for fd in fd_set */
3 FD_CLR(int fd, fd_set *fdset); /* turn off the bit for fd in fd_set */
4 int FD_ISSET(int fd, fd_set *fdset); /* is the bit for fd on in fdset? */
如:
2 FD_ZERO(&rset); /* initialize the set: all bits off */
3 FD_SET(1, &rset); /* turn on bit for fd 1 */
4 FD_SET(4, &rset); /* turn on bit for fd 4 */
5 FD_SET(5, &rset); /* turn on bit for fd 5 */
当select返回的时候,rset位都将被置0,除了那些有变化的fd位。
当发生如下情况时认为是可读的:
- socket的receive buffer中的字节数大于socket的receive buffer的low-water mark属性值。(low-water mark值类似于分水岭,当receive buffer中的字节数小于low-water mark值的时候,认为socket还不可读,只有当receive buffer中的字节数达到一定量的时候才认为socket可读)
- 连接半关闭(读关闭,即收到对端发来的FIN包)
- 发生变化的描述符是被动套接字,而连接的三路握手完成的数量大于0,即有新的TCP连接建立
- 描述符发生错误,如果调用read系统调用读套接字的话会返回-1。
当发生如下情况时认为是可写的:
- socket的send buffer中的字节数大于socket的send buffer的low-water mark属性值以及socket已经连接或者不需要连接(如UDP)。
- 写半连接关闭,调用write函数将产生SIGPIPE
- 描述符发生错误,如果调用write系统调用写套接字的话会返回-1。
注意:
select默认能处理的描述符数量是有上限的,为FD_SETSIZE的大小。
对于timeout参数,如果置为NULL,则表示wait forever;若timeout->tv_sec = timeout->tv_usec = 0,则表示do not wait at all;否则指定等待时间。
如果使用select处理多个套接字,那么需要使用一个数组(也可以是其他结构)来记录各个描述符的状态。而使用poll则不需要,下面看poll函数。
(二)poll()函数
原型如下:
各参数含义如下:
- struct pollfd *fdarray:一个结构体,用来保存各个描述符的相关状态。
- unsigned long nfds:fdarray数组的大小,即里面包含有效成员的数量。
- int timeout:设定的超时时间。(以毫秒为单位)
poll函数返回值及含义如下:
- -1:有错误产生
- 0:超时时间到,而且没有描述符有状态变化
- >0:有状态变化的描述符个数
着重讲fdarray数组,因为这是它和select()函数主要的不同的地方:
pollfd的结构如下:
2 int fd; /* descriptor to check */
3 short events; /* events of interest on fd */
4 short revents; /* events that occured on fd */
5 };
其实poll()和select()函数要处理的问题是相同的,只不过是不同组织在几乎相同时刻同时推出的,因此才同时保留了下来。select()函数把可读描述符、可写描述符、错误描述符分在了三个集合里,这三个集合都是用bit位来标记一个描述符,一旦有若干个描述符状态发生变化,那么它将被置位,而其他没有发生变化的描述符的bit位将被clear,也就是说select()的readset、writeset、errorset是一个value-result类型,通过它们传值,而也通过它们返回结果。这样的一个坏处是每次重新select 的时候对集合必须重新赋值。而poll()函数则与select()采用的方式不同,它通过一个结构数组保存各个描述符的状态,每个结构体第一项fd代表描述符,第二项代表要监听的事件,也就是感兴趣的事件,而第三项代表poll()返回时描述符的返回状态。合法状态如下:
- POLLIN: 有普通数据或者优先数据可读
- POLLRDNORM: 有普通数据可读
- POLLRDBAND: 有优先数据可读
- POLLPRI: 有紧急数据可读
- POLLOUT: 有普通数据可写
- POLLWRNORM: 有普通数据可写
- POLLWRBAND: 有紧急数据可写
- POLLERR: 有错误发生
- POLLHUP: 有描述符挂起事件发生
- POLLNVAL: 描述符非法
对于POLLIN | POLLPRI等价与select()的可读事件;POLLOUT | POLLWRBAND等价与select()的可写事件;POLLIN 等价与POLLRDNORM | POLLRDBAND,而POLLOUT等价于POLLWRBAND。如果你对一个描述符的可读事件和可写事件以及错误等事件均感兴趣那么你应该都进行相应的设置。
对于timeout的设置如下:
- INFTIM: wait forever
- 0: return immediately, do not block
- >0: wait specified number of milliseconds
对于select()和poll()函数的讲解暂时到此。 更多细节请参考下面这篇博文:http://www.cppblog.com/just51living/archive/2011/07/28/151995.html
参考代码
- #include <sys/time.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <stdio.h>
- #include <fcntl.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <unistd.h>
- #include <poll.h>
- #include <sys/epoll.h>
- #define SERV_PORT 5060
- #define SERVER_IP "192.168.2.35"
- #define MAX 2
- #define USE_SELECT
- #define USE_POLL
- int readfd = 0;
- int creat_udp_socket();
- int main(int argc, char **argv)
- {
- struct sockaddr_in cliaddr; /* IPv4套接口地址定义*/
- int addr_len = sizeof(struct sockaddr_in);
- char recvbuff[600] = {0};
- int lenth = 0;
- fd_set rfds;
- struct timeval tv;
- int retval;
- int count = 0;
- if( creat_udp_socket() != NULL )
- {
- printf("creat_udp_socket error! ");
- return 0;
- }
- printf("readfd:%d ",readfd);
- /************** epoll举例 ****************/
- #if 1
- int epfd;
- int nfds = 0;
- int loop = 0;
- struct epoll_event ev;
- struct epoll_event events[MAX];
- epfd = epoll_create(MAX);
- if( epfd < 0 )
- {
- printf("epoll_create error! ");
- return 0;
- }
- /*ev.events = EPOLLIN|EPOLLET;*/
- /* 监听读事件 */
- ev.events = EPOLLIN;
- /* 设置监听的句柄 */
- ev.data.fd = readfd;
- /* 注册epoll事件 */
- epoll_ctl(epfd,EPOLL_CTL_ADD,readfd,&ev);
- while(1)
- {
- /* 等待epoll事件的发生 */
- nfds=epoll_wait(epfd, events, 200, -1);
- /* 如果没有数据可读,相当与轮询,将会消耗大量的CPU资源 */
- nfds = epoll_wait(epfd, events, 200, 0);
- /* 处理所发生的所有事件 */
- for( loop = 0; loop < nfds; loop++ )
- {
- if( events[loop].events & EPOLLIN )
- {
- lenth = recvfrom (events[loop].data.fd, recvbuff, sizeof(recvbuff), 0, (struct sockaddr *)&cliaddr, &addr_len);
- if (lenth <= 0 )
- {
- printf("receve data errer! ");
- }
- /*printf("%s",recvbuff);*/
- printf("%d ",count++);
- }
- }
- /*sleep(1);*/
- }
- close(readfd);
- close(epfd);
- #endif
- /************** poll举例 ****************/
- #if 0
- struct pollfd poll_list[MAX];
- int loop = 0;
- for( loop = 0; loop < MAX; loop++ )
- {
- poll_list[loop].fd = -1;
- poll_list[loop].events = NULL;
- }
- poll_list[0].fd = readfd;
- poll_list[0].events = POLLIN;
- while(1)
- {
- retval = poll(poll_list,MAX,-1);
- if( retval > 0 )
- {
- if( poll_list[0].revents & POLLIN )
- {
- lenth = recvfrom (readfd,recvbuff,sizeof(recvbuff),0,(struct sockaddr *)&cliaddr,&addr_len);
- if (lenth <= 0 )
- {
- printf("receve data errer! ");
- }
- /*printf("%s",recvbuff);*/
- printf("%d ",count++);
- }
- }
- sleep(1);
- }
- #endif
- /************** select举例 ****************/
- #if 0
- while(1)
- {
- /* Watch stdin (fd 0) to see when it has input. */
- FD_ZERO(&rfds);
- FD_SET(readfd, &rfds);
- /* Wait up to five seconds. */
- tv.tv_sec = 200;
- tv.tv_usec = 0;
- /*retval = select (FD_SETSIZE, &rfds, NULL, NULL, &tv);*/
- /*retval = select (FD_SETSIZE, &rfds, NULL, NULL, NULL);*/
- retval = select (readfd+1, &rfds, NULL, NULL, NULL);
- if( retval > 0 )
- {
- if( FD_ISSET(readfd, &rfds) )
- {
- lenth = recvfrom (readfd,recvbuff,sizeof(recvbuff),0,(struct sockaddr *)&cliaddr,&addr_len);
- if (lenth <= 0 )
- {
- printf("receve data errer! ");
- }
- /*printf("%s",recvbuff);*/
- printf("%d ",count++);
- }
- }
- }
- #endif
- /*
- while(1)
- {
- lenth = recvfrom (readfd,recvbuff,sizeof(recvbuff),0,(struct sockaddr *)&cliaddr,&addr_len);
- if (lenth <= 0 )
- {
- printf("receve data errer! ");
- }
- else
- {
- printf ("recever string: %s",recvbuff);
- }
- }
- sleep(20);
- */
- return 0;
- }
- int creat_udp_socket()
- {
- struct sockaddr_in servaddr, cliaddr; /* IPv4套接口地址定义*/
- int addr_len = sizeof(struct sockaddr_in);
- bzero(&servaddr, sizeof(servaddr)); /* 地址结构清零 */
- servaddr.sin_family = AF_INET; /* IPv4协议 */
- /*servaddr.sin_addr.s_addr = inet_addr (SERVER_IP);*/ /* 指定地址 */
- servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
- servaddr.sin_port = htons(SERV_PORT);
- readfd = socket (AF_INET, SOCK_DGRAM, 0); /* 建立UDP套接字*/
- if(readfd < 0)
- {
- printf("socket errer! ");
- return -1;
- }
- /*分配协议地址,绑定端口*/
- if( bind(readfd, (struct sockaddr *)&servaddr, sizeof (struct sockaddr_in)) == -1 )
- {
- printf("bind errer! ");
- return -1;
- }
- return 0;
- }
前言
本章节是用基本的Linux/Unix基本函数加上poll调用编写一个完整的服务器和客户端例子,可在Linux(ubuntu)和Unix(freebsd)上运行,客户端和服务端的功能如下:
客户端从标准输入读入一行,发送到服务端
服务端从网络读取一行,然后输出到客户端
客户端收到服务端的响应,输出这一行到标准输出
服务端
代码如下:
#include <unistd.h>
#include <sys/types.h> /* basic system data types */
#include <sys/socket.h> /* basic socket definitions */
#include <netinet/in.h> /* sockaddr_in{} and other Internet defns */
#include <arpa/inet.h> /* inet(3) functions */ #include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h> #include <poll.h> /* poll function */
#include <limits.h> #define MAXLINE 10240 #ifndef OPEN_MAX
#define OPEN_MAX 40960
#endif void handle(struct pollfd* clients, int maxClient, int readyClient); int main(int argc, char **argv)
{
int servPort = 6888;
int listenq = 1024;
int listenfd, connfd;
struct pollfd clients[OPEN_MAX];
int maxi;
socklen_t socklen = sizeof(struct sockaddr_in);
struct sockaddr_in cliaddr, servaddr;
char buf[MAXLINE];
int nready; bzero(&servaddr, socklen);
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(servPort); listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
perror("socket error");
} int opt = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror("setsockopt error");
} if(bind(listenfd, (struct sockaddr *) &servaddr, socklen) == -1) {
perror("bind error");
exit(-1);
}
if (listen(listenfd, listenq) < 0) {
perror("listen error");
} clients[0].fd = listenfd;
clients[0].events = POLLIN;
int i;
for (i = 1; i< OPEN_MAX; i++)
clients[i].fd = -1;
maxi = listenfd + 1; printf("pollechoserver startup, listen on port:%d\n", servPort);
printf("max connection is %d\n", OPEN_MAX); for ( ; ; ) {
nready = poll(clients, maxi + 1, -1);
//printf("nready is %d\n", nready);
if (nready == -1) {
perror("poll error");
}
if (clients[0].revents & POLLIN) {
connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &socklen);
sprintf(buf, "accept form %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
printf(buf, ""); for (i = 0; i < OPEN_MAX; i++) {
if (clients[i].fd == -1) {
clients[i].fd = connfd;
clients[i].events = POLLIN;
break;
}
} if (i == OPEN_MAX) {
fprintf(stderr, "too many connection, more than %d\n", OPEN_MAX);
close(connfd);
continue;
} if (i > maxi)
maxi = i; --nready;
} handle(clients, maxi, nready);
}
} void handle(struct pollfd* clients, int maxClient, int nready) {
int connfd;
int i, nread;
char buf[MAXLINE]; if (nready == 0)
return; for (i = 1; i< maxClient; i++) {
connfd = clients[i].fd;
if (connfd == -1)
continue;
if (clients[i].revents & (POLLIN | POLLERR)) {
nread = read(connfd, buf, MAXLINE);//读取客户端socket流
if (nread < 0) {
perror("read error");
close(connfd);
clients[i].fd = -1;
continue;
}
if (nread == 0) {
printf("client close the connection");
close(connfd);
clients[i].fd = -1;
continue;
} write(connfd, buf, nread);//响应客户端
if (--nready <= 0)//没有连接需要处理,退出循环
break;
}
}
}
下载和编译
编译和启动服务端
gcc pollechoserver.c -o pollechoserver
./pollechoserver
至于客户端可以参考本文的Linux/Unix服务端和客户端Socket编程入门实例的echoclient例子下载编译。
Linux 异步IO介绍
http://my.oschina.net/sundq/blog/187249
epoll
epoll
是Linux对select
功能的改进,其性能大大提升,而且和监控的IO个数无关。
API:
epoll_create:
***`int epoll_create(int size);`*** 创建一个`epoll`实例,`size`参数是可监控IO的数量大小,但是Linux 2.6.8之后已不再使用。
epoll_ctl:
***`int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);`*** 对`epoll`实例进行操作,由`op`指定操作类型:
EPOLL_CTL_ADD:添加一个文件描述符到
epoll
实例中。EPOLL_CTL_MOD:更新
epoll
实例中文件描述符。EPOLL_CTL_DEL:删除
epoll
实例中文件描述符。epoll_wait
***```int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);```*** 等待已经准备好的IO.`events`参数返回的数据就是在`epoll_ctl`函数第三个参数设置的值。
限制:无
使用范例:
epoll
最多的用途就是socket编程,可以大大提高服务器的性能,此处我们实现一个简单的http服务器。#define MAXFDS 128
#define EVENTS 100
#define PORT 8080
#define MAXEPOLLSIZE 1024*10 typedef enum
{
false,
true
}bool; /***************定义处理socket的回调函数类型***********/
typedef int (*socket_pro)(int fd,void *data); /***************定义回调函数的用户数据***********/
typedef struct userdata
{
int fd;
socket_pro cb;
}userdata_t; static int epfd;//epoll句柄 /***************发送一个文件数据的函数***********/
static void cws_client_request (int connfd,void *data)
{
struct epoll_event ev = {0};
char buffer[1024*8] = {0};
int ret;
char *requestPath = NULL;
char tmpPath[512] = {"./www/"};
int pagesize = 0;
ret = recv (connfd, buffer, sizeof (buffer) -1, 0);
if (ret > 0)
{
if (strncmp (buffer, "GET ", 4) != 0)
{
printf("bad request.\n");
}
if (strncmp (buffer, "GET /", 5) == 0)
{ if (strncmp (buffer, "GET / ", 6) == 0)
{
strcat(tmpPath, "/index.html");
requestPath = tmpPath;
}
else
{
requestPath = buffer+5;
char * pos = strstr(buffer+5, " ");
strncat(tmpPath, requestPath, pos - requestPath);
requestPath = tmpPath;
}
}
char * badRequest = (char *)"<b>Error 404 Not Found.</b>";
char * httpStatus200 = (char *)"HTTP/1.0 200 OK\r\nContent-Type:text/html\r\n\r\n"; FILE * fp = fopen(requestPath, "r");
FILE * connfp = fdopen(connfd, "w");
if ( connfp == NULL )
{
perror("fdopen error");//cout <<"bad connfp"<<endl;
}
if (fp == NULL)
{
setlinebuf(connfp);
fwrite(badRequest, strlen(badRequest), 1, connfp);
fclose(connfp);
}
else
{
setlinebuf(connfp);
//fwrite(httpStatus200, strlen(httpStatus200), 1, connfp);
//fflush(connfp);
while ((ret = fread (buffer, 1, sizeof(buffer) -1, fp)) > 0)
{
fwrite(buffer, 1, ret, connfp);
pagesize += ret;
fflush(connfp);
}
printf("pagesize:%d\n",pagesize);//cout <<"pagesize:" << pagesize << endl;
fclose(fp);
}
} //1
close(connfd);
epoll_ctl (epfd, EPOLL_CTL_DEL, connfd, &ev);
} /***************处理已经连接socket函数***********/
static int __process_data_fd(int fd,void *data)
{
struct epoll_event *ev = (struct epoll_event *)data;
cws_client_request(fd,ev);
free(data); return;
} /***************处理监听socket函数***********/
static int __process_listen_fd(int fd,void *data)
{
struct sockaddr_in caddr = {0};
struct epoll_event ev = {0};
int len = sizeof (caddr);
int cfd = accept (fd, (struct sockaddr *) &caddr, (socklen_t *) & len);
if (-1 == cfd)
{
perror("accpet error");//cout << "server has an error, now accept a socket fd" <<endl;
break;
}
setNonBlock (cfd); userdata_t *cb_data = malloc(sizeof(userdata_t));
cb_data->fd = cfd;
cb_data->cb = __process_data_fd; ev.data.ptr = cfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl (epfd, EPOLL_CTL_ADD, cfd, &ev); return 0;
} /***************设置描述符为非阻塞***********/
static bool setNonBlock (int fd)
{
int flags = fcntl (fd, F_GETFL, 0);
flags |= O_NONBLOCK;
if (-1 == fcntl (fd, F_SETFL, flags))
{
return false;
}
return true;
} /***************主函数***********/
int main (int argc, char *argv[])
{
int listen_fd, nfds;
int on = 1;
char buffer[512];
struct sockaddr_in saddr, caddr;
struct epoll_event ev, events[EVENTS]; signal(SIGPIPE, SIG_IGN); if (fork())
{
exit(0);
} if (-1 == (listen_fd = socket(AF_INET, SOCK_STREAM, 0)))
{
perror("socket error");//cout << "create socket error!" << endl;
return -1;
} epfd = epoll_create (MAXFDS);
setsockopt (listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on));
bzero (&saddr, sizeof (saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons ((short) (PORT));
saddr.sin_addr.s_addr = INADDR_ANY;
if (-1 == bind(listen_fd, (struct sockaddr *) &saddr, sizeof (saddr)))
{
perror("bind error");//cout << " cann't bind socket on server " << endl;
return -1;
} if (-1 == listen (listen_fd, 32))
{
perror("listen error");//cout << "listen error" << endl;
return -1;
}
userdata_t *cb_data = (userdata_t *)malloc(sizeof(userdata_t));
cb_data->fd = listen_fd;
cb_data->cb = __process_listen_fd;
ev.data.ptr = cb_data;
ev.events = EPOLLIN|EPOLLET;
epoll_ctl (epfd, EPOLL_CTL_ADD, listen_fd, &ev);
for (;;)
{
int i;
nfds = epoll_wait (epfd, events, MAXFDS, -1);
for (i = 0; i < nfds; ++i)
{
userdata_t *cb_data = (userdata_t *)events[i].data.ptr;
cb_data.cb(cb_data.fd,cb_data);
}
}
if (listen_fd > 0)
{
shutdown (listen_fd, SHUT_RDWR);
close (listen_fd);
} return 0;
}
linux native aio
Linux native aio 有两种API,一种是libaio提供的API,一种是利用系统调用封装成的API,后者使用的较多,因为不需要额外的库且简单。
API
io_setup:
是用来设置一个异步请求的上下文,第一个参数是请求事件的个数,第二个参数唯一标识一个异步请求。
io_commit:
是用来提交一个异步io请求的,在提交之前,需要设置一下结构体`iocb`,该结构体有以下字段需要设置:
aio_data:
:用户设置的数据,到时通过io_getevents
函数返回。aio_lio_opcode
:异步操作码,有以下几种操作:- `IOCB_CMD_PREAD`:读操作,相当于调用`pread`
- `IOCB_CMD_PWRITE`:写操作,相当于`pwrite`
- `IOCB_CMD_FSYNC`:同步操作,相当于调用`fsync`
- `IOCB_CMD_FDSYNC`:同步操作,相当于调用`fdatasync`
aio_buf
:用户提供的存储数据的bufferaio_offset
:文件的中偏移量aio_nbytes
:IO操作的数据大小aio_flags
:该字段要么不设置,要么设置为IOCB_FLAG_RESFD
,表示使用eventfd通知事件的完成。aio_resfd
:如果aio_flags
字段设置为IOCB_FLAG_RESFD
,该字段设置为eventfd
的返回值。
io_getevents:
用来获取完成的io事件,参数`min_nr`是事件个数的的最小值,`nr`是事件个数的最大值,如果没有足够的事件发生,该函数会阻塞, `timeout`参数是阻塞的超时时间。该函数会返回一个`io_events`结构体,该结构体有以下字段: * `data`:这就是在`struct iocb`结构体`aio_data`设置的值。
* `obj`:返回的`struct iocb`结构体,是`io_commit`第三个参数设置数组中的值。
* `res`和`res2`:返回结果。
io_destroy:
在所有时间处理完之后,调用此函数销毁异步io请求。
限制:
aio只能使用于常规的文件IO,不能使用于socket,管道等IO。
使用范例
上面已经介绍过了,io_getevents在调用之后会阻塞直到有足够的事件发生,因此要实现真正的异步IO,需要借助
eventfd
和epoll
达到目的。
首先我们封装一下系统调用来作为我们使用的API:
int io_setup(unsigned nr, aio_context_t *ctxp)
{
return syscall(__NR_io_setup, nr, ctxp);
} int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
{
return syscall(__NR_io_submit, ctx, nr, iocbpp);
} int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
struct io_event *events, struct timespec *timeout)
{
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
} int io_destroy(aio_context_t ctx)
{
return syscall(__NR_io_destroy, ctx);
} int eventfd2(unsigned int initval, int flags)
{
return syscall(__NR_eventfd2, initval, flags);
}
定义自己的异步用户数据和回调函数:
typedef void io_callback_t(aio_context_t ctx, struct iocb *iocb, long res);//回调函数类型 struct userdata//用户数据
{
int64_t offset;
int64_t filesize;
int64_t block_size;
}; struct user_iocb//封装结构体,以便异步返回用户数据。
{
struct iocb iocb;
struct userdata user_cb;
}; void aio_callback(aio_context_t ctx, struct iocb *iocb, long res, long res2)
{
int64_t offset = iocb->aio_offset;
struct custom_iocb *iocbp = (struct custom_iocb *) iocb;
printf("data=%.*s\n",page_size, (char *) iocb->aio_buf);
}
创建异步请求:
//异步读取整个文件数据
#define page_size 1024
char *path = "test.txt" //计算异步请求的个数
static int get_event_num(uint64_t len)
{
return len / page_size + (len % page_size != 0);
} static int get_filesize(char *path)
{
struct stat buf = {0};
int iret = stat(path, &buf);
return (iret >= 0) ? buf.st_size : iret;
}
int main(void)
{
int event_fd = 0;
int file_size = get_filesize(path);
int event_num = get_event_num(file_size);
struct iocb *iocbs = malloc(event_num * sizeof (struct iocb));
struct iocb * iocbps[event_num] = {0};
aio_context_t ctx = 0; /****************创建使用的eventfd******************/
event_fd = eventfd2(0, O_NONBLOCK | O_CLOEXEC); /****************创建异步请求上下文*****************/
if (io_setup(event_num, &ctx))
{
perror("io_setup");
return 4;
} /*****************设置请求的数据********************/
int fd = open(path, O_RDWR, 0664);
for (int i = 0; i < event_num; i++)
{
struct iocb *_ = iocbs + i;
_->aio_buf = (__u64) ((char *) buf + (i * page_size));//设置存储请求数据的buf
_->aio_fildes = fd;//请求的文件描述符
_->aio_offset = i * page_size;//读文件的偏移量,异步请求文件指针不会移动的,自己设定
_->aio_nbytes = page_size;//每个异步请求请求数据的大小
_->aio_resfd = event_fd;//用来通知有事件完成的eventfd
_->aio_flags = IOCB_FLAG_RESFD;//使用eventfd的标志
_->aio_data = (__u64) aio_callback;//设定用户数据,这里是个回调函数
iocbps[i] = _;
} /*****************提交异步请求********************/
if (io_submit(ctx, event_num, iocbps) != event_num)
{
perror("io_submit");
return 6;
} /***************利用epoll等待异步请求事件完成*****/
int epfd = 0;
epfd = epoll_create(1);
if (epfd == -1)
{
perror("epoll_create");
return 7;
}
struct epoll_event epevent = {0};
epevent.events = EPOLLIN | EPOLLET;
epevent.data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, event_fd, &epevent))
{
perror("epoll_ctl");
return 8;
} /*****************处理异步请求事件********/
int i = 0;
while (i < event_num)
{
int64_t finished_aio = 0;
if (epoll_wait(epfd, &epevent, 1, -1) != 1)//等待事件发生
{
perror("epoll_wait");
return 9;
} //读取异步事件完成的个数,finished_aio接收
if (read(event_fd, &finished_aio, sizeof (finished_aio)) != sizeof (finished_aio))
{
perror("read");
return 10;
}
printf("finished io number: %"PRIu64"\n", finished_aio); struct timespec tms;
struct io_event events[event_num];
while (finished_aio > 0)
{
tms.tv_sec = 0;
tms.tv_nsec = 0;
int r = io_getevents(ctx, 1, event_num, events, &tms);//获取发生的事件
if (r > 0)
{
int j = 0;
for (j = 0; j < r; ++j)
{
((io_callback_t *) (events[j].data))(ctx, (struct iocb *) events[j].obj, events[j].res, events[j].res2);//调用用户callback
}
i += r;
finished_aio -= r;
}
}
} /*****************处理异步事件结束********/
close(epfd);
io_destroy(ctx);
close(fd);
close(event_fd); return 0; }
SIGIO
在Linux下,每一个文件描述符都可以设置O_ASYNC
标识,当文件可读或者可写时可以发送信号通知相关进程,默认信号是SIGIO
,可以
使用fcntl
函数改变这个默认信号。
限制:
在多个文件描述符的情况下,信号驱动模式不能确定哪个描述符是可以读或者写的,所以必须遍历每一个描述符来判断,一个
好的方法是使用
epoll
获取哪个描述符可读或者写。只能用于socket、管道和终端IO,不能用于普通文件的IO。
使用范例
设置描述符的
O_ASYNC
标识和描述符的归属进程。设置文件描述符为非阻塞。
设置信号
SIGIO
的处理动作。void new_op(int signum, siginfo_t *info, void *myact)//信号处理函数的实现
{
int i;
for (i = 0; i < 1; i++)
{
printf("data:%u\n", info->si_int);
}
char buf[10] = {0};
while(read(STDIN_FILENO,buf,10) > 0)
{
printf("input:%s",buf);
}
printf("handle signal %d over;\n", signum);
} int main(void)
{
int oflags;
struct sigaction act; sigemptyset(&act.sa_mask);
act.sa_sigaction = new_op; //信号处理函数
if (sigaction(SIGPOLL, &act, NULL) < 0)
{
printf("install sigal error\n");
} fcntl(STDIN_FILENO, F_SETOWN, getpid());
oflags = fcntl(STDIN_FILENO, F_GETFL);
fcntl(STDIN_FILENO, F_SETFL, oflags | FASYNC|O_NONBLOCK); while (1){
sleep(1);
};
}