epoll有两种触发的方式即LT(水平触发)和ET(边缘触发)两种,在前者,只要存在着事件就会不断的触发,直到处理完成,而后者只触发一次相同事件或者说只在从非触发到触发两个状态转换的时候儿才触发。

这会出现下面一种情况,如果是多线程在处理,一个SOCKET事件到来,数据开始解析,这时候这个SOCKET又来了同样一个这样的事件,而你的数据解析尚未完成,那么程序会自动调度另外一个线程或者进程来处理新的事件,这造成一个很严重的问题,不同的线程或者进程在处理同一个SOCKET的事件,这会使程序的健壮性大降低而编程的复杂度大大增加!!即使在ET模式下也有可能出现这种情况!!

解决这种现象有两种方法:

第一种方法是在单独的线程或进程里解析数据,也就是说,接收数据的线程接收到数据后立刻将数据转移至另外的线程。

第二种方法就是本文要提到的EPOLLONESHOT这种方法,可以在epoll上注册这个事件,注册这个事件后,如果在处理写成当前的SOCKET后不再重新注册相关事件,那么这个事件就不再响应了或者说触发了。要想重新注册事件则需要调用epoll_ctl重置文件描述符上的事件,这样前面的socket就不会出现竞态这样就可以通过手动的方式来保证同一SOCKET只能被一个线程处理,不会跨越多个线程。

看下面的代码:

void Eepoll::ResetOneShot(int  epollfd,SOCKET fd,bool bOne)

{

         epoll_eventevent;

         event.data.fd= fd;

         event.events= EPOLLIN | EPOLLET ;

         if(bOne)

         {

                   event.events |=EPOLLONESHOT;

         }

         if(-1 == epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&event))

         {

                   perror("resetoneshotepoll_ctl error!");

         }

}

  

这里有一个问题,在操作ET模式下的EPOLL时,对EPOLLONESHOT没有什么太大的注意点,但是在LT时,就有一些注意的了。

前面说过LT会不断触发,所以在处理数据时,不需要在RECV时不断的循环去读一直读到EAGAIN,但如果设置了EPOLLONESHOT后,也得如此办理,否则,就可能会丢掉数据。一个采用EPOLLONETSHOT的例子:

epoll_oneshot._server.cpp服务端程序:

#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
#include<sys/epoll.h>
#include<pthread.h>
#include<iostream>
#define MAX_EVENT_NUMBER 1024//最大事件连接数
#define BUFFER_SIZE 1024//接收缓冲区大小
using namespace std;
struct fds{//文件描述符结构体,用作传递给子线程的参数
int epollfd;
int sockfd;
};
int setnonblocking(int fd){//设置文件描述符为非阻塞
int old_option=fcntl(fd,F_GETFL);
int new_option=old_option|O_NONBLOCK;
fcntl(fd,F_SETFL,new_option);
return old_option;
}
void addfd(int epollfd,int fd,bool oneshot){//为文件描述符添加事件
epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN|EPOLLET;
if(oneshot){//采用EPOLLONETSHOT事件
event.events|=EPOLLONESHOT;
}
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
void reset_oneshot(int epollfd,int fd){//重置事件
epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN|EPOLLET|EPOLLONESHOT;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&event);
}
void* worker(void* arg){//工作者线程(子线程)接收socket上的数据并重置事件
int sockfd=((fds*)arg)->sockfd;
int epollfd=((fds*)arg)->epollfd;//事件表描述符从arg参数(结构体fds)得来
cout<<"start new thread to receive data on fd:"<<sockfd<<endl;
char buf[BUFFER_SIZE];
memset(buf,'\0',BUFFER_SIZE);//缓冲区置空
while(1){
int ret=recv(sockfd,buf,BUFFER_SIZE-1,0);//接收数据
if(ret==0){//关闭连接
close(sockfd);
cout<<"close "<<sockfd<<endl;
break;
}
else if(ret<0){
if(errno==EAGAIN){//并非网络出错,而是可以再次注册事件
reset_oneshot(epollfd,sockfd);
cout<<"reset epollfd"<<endl;
break;
}
}
else{
cout<<buf;
sleep(5);//采用睡眠是为了在5s内若有新数据到来则该线程继续处理,否则线程退出
}
}
cout<<"thread exit on fd:"<<sockfd;
//_exit(0);//这个会终止整个进程!!
return NULL;
}
int main(int argc,char* argv[]){
if(argc<=2){
cout<<"argc<=2"<<endl;
return 1;
}
const char* ip=argv[1];
int port=atoi(argv[2]);
int ret=0;
struct sockaddr_in address;
bzero(&address,sizeof(address));
address.sin_family=AF_INET;
inet_pton(AF_INET,ip,&address.sin_addr);
address.sin_port=htons(port);
int listenfd=socket(PF_INET,SOCK_STREAM,0);
assert(listenfd>=0);
ret=bind(listenfd,(struct sockaddr*)&address,sizeof(address));
assert(ret!=-1);
ret=listen(listenfd,5);
assert(ret!=-1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd=epoll_create(5);
assert(epollfd!=-1);
addfd(epollfd,listenfd,false);//不能将监听端口listenfd设置为EPOLLONESHOT否则会丢失客户连接
while(1){
int ret=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);//等待事件发生
if(ret<0){
cout<<"epoll error"<<endl;
break;
}
for(int i=0;i<ret;i++){
int sockfd=events[i].data.fd;
if(sockfd==listenfd){//监听端口
struct sockaddr_in client_address;
socklen_t client_addrlength=sizeof(client_address);
int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
addfd(epollfd,connfd,true);//新的客户连接置为EPOLLONESHOT事件
}
else if(events[i].events&EPOLLIN){//客户端有数据发送的事件发生
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd=epollfd;
fds_for_new_worker.sockfd=sockfd;
pthread_create(&thread,NULL,worker,(void*)&fds_for_new_worker);//调用工作者线程处理数据
}
else{
cout<<"something wrong"<<endl;
}
}
}
close(listenfd);
return 0;
}

 

05-20 01:14