http_conn.h
#ifndef HTTPCONNECTION_H
#define HTTPCONNEVTION_H
#include<sys/epoll.h>
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<signal.h>
#include<sys/types.h>
#include<fcntl.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/stat.h>
#include<sys/mman.h>
#include<stdarg.h>
#include<errno.h>
#include"locker.h"
#include<sys/uio.h>
//解析HTTP请求报文
class http_conn{
public:
static int m_epollfd; //所有的socket上的事件都被注册到同一个epoll对象中,所有设置为静态的
static int m_user_count; //统计用户的数量
http_conn(){}
~http_conn(){}
void process(); //处理客户端的请求
void init(int sockfd,const sockaddr_in &addr);//初始化新接收的连接
void close_conn(); //关闭连接
bool read(); //非阻塞的读
bool write(); //非阻塞的写
private:
int m_sockfd;//该HTTP连接的socket
sockaddr_in m_address;//通信的socket地址
};
#endif
http_conn.cpp
#include"http_conn.h"
int http_conn::m_epollfd=-1;
int http_conn::m_user_count=0;
//设置文件描述符非阻塞
int setnonblocking(int fd){
int old_flag=fcntl(fd,F_GETFL);
int new_flag=old_flag | O_NONBLOCK;
fcntl(fd,F_SETFL,new_flag);
}
//添加文件描述符到epoll中
//向epoll中添加需要监听的文件描述符
void addfd(int epollfd,int fd,bool one_shot){
epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN | EPOLLRDHUP;
if(one_shot){
event.events | EPOLLONESHOT;
}
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
//设置文件描述符非阻塞
setnonblocking(fd);
}
//从epoll中删除文件描述符
void removefd(int epollfd,int fd){
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0);
close(fd);
}
//修改文件描述符,重置socket上EPOLLONESHOT事件,以确保下一次可读时,EPOLLIN事件能被触发
void modfd(int epollfd,int fd,int ev){
epoll_event event;
event.data.fd=fd;
event.events=ev | EPOLLONESHOT | EPOLLRDHUP;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&event);
}
//初始化新接收的连接
//前面要加http_conn::,要加上类名
void http_conn::init(int sockfd,const sockaddr_in &addr){
m_sockfd=sockfd;
m_address=addr;
//设置一下端口复用,设置了才能加到epoll中
int reuse=1;//只有为1这个值才表示复用
setsockopt(m_sockfd,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));
//添加到epoll对象中
addfd(m_epollfd,sockfd,true);
m_user_count++;//总用户数加1
}
//关闭连接
void http_conn::close_conn(){
if(m_sockfd!=-1){
removefd(m_epollfd,m_sockfd);
m_sockfd=-1;
m_user_count--;//关闭一个连接,客户总数量减1
}
}
bool http_conn::read(){
printf("一次性读完数据\n");
return true;
}
bool http_conn::write(){
printf("一次性写完数据\n");
return true;
}
//由线程池中的工作线程调用,这是处理HTTP请求的入口函数
void http_conn::process(){
//解析HTTP请求,做业务逻辑
printf("parse request,create response");
//生成响应
}
locker.h
#ifndef LOCKER_H
#define LOCKER_H
#include<pthread.h>
#include<exception>
#include<semaphore.h>
//线程同步机制封装类
//互斥锁类
class locker{
public:
locker(){
if(pthread_mutex_init(&m_mutex,NULL)!=0){
throw std::exception();
}
}
~locker(){
pthread_mutex_destroy(&m_mutex);
}
bool lock(){
return pthread_mutex_lock(&m_mutex)==0;
}
bool unlock(){
return pthread_mutex_unlock(&m_mutex)==0;
}
pthread_mutex_t * get(){
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
//条件变量类
class cond{
public:
cond(){
if(pthread_cond_init(&m_cond,NULL)!=0){
throw std::exception();
}
}
~cond(){
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t * mutex){
return pthread_cond_wait(&m_cond,mutex)==0;
}
bool timewait(pthread_mutex_t * mutex,struct timespec t){
return pthread_cond_timedwait(&m_cond,mutex,&t)==0;
}
bool signal(){
return pthread_cond_signal(&m_cond)==0;
}
bool broadcast(){
return pthread_cond_broadcast(&m_cond)==0;
}
private:
pthread_cond_t m_cond;
};
//信号量类
class sem{
public:
sem(){
if(sem_init(&m_sem,0,0)!=0){
throw std::exception();
}
}
sem(int num){
if(sem_init(&m_sem,0,num)!=0){
throw std::exception();
}
}
~sem(){
sem_destroy(&m_sem);
}
//等待信号量
bool wait(){
return sem_wait(&m_sem)==0;
}
//增加信号量
bool post(){
return sem_post(&m_sem)==0;
}
private:
sem_t m_sem;
};
#endif
main.cpp
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<error.h>
#include<fcntl.h>
#include<sys/epoll.h>
#include<signal.h>
#include"locker.h"
#include"threadpool.h"
#include"http_conn.h"
#define MAX_FD 65535 //最大文件描述符个数
//虽然最大有这么多,但是跑起来支持不了这么多并发
#define MAX_EVENT_NUMBER 10000 //监听的最大的事件数量,过少的话程序性能不行
//添加信号捕捉
void addsig(int sig,void(handler)(int)){
struct sigaction sa;
memset(&sa,'\0',sizeof(sa));
sa.sa_handler=handler;
sigfillset(&sa.sa_mask);
sigaction(sig,&sa,NULL);
}
//添加文件描述符到epoll中
extern void addfd(int epollfd,int fd,bool one_shot);
//从epoll中删除文件描述符
extern void removefd(int epollfd,int fd);
//修改文件描述符
extern void modfd(int epollfd,int fd,int ev);
int main(int argc,char* argv[]){
if(argc<=1){
printf("按照如下格式运行:%s port_number\n",basename(argv[0]));
exit(-1);
}
//获取端口号
int port = atoi(argv[1]);
//对SIGPIE信号进行处理
addsig(SIGPIPE,SIG_IGN);
//创建线程池初始化信息
threadpool<http_conn> * pool=NULL;
try{
pool=new threadpool<http_conn>;
} catch(...){
exit(-1);
}//线程池都没创建好的话,直接退出
//当前main函数是主线程,任务类是http_conn这个类
//创建一个数组用于保存所有的客户端信息
http_conn * users=new http_conn[ MAX_FD ];
//写网络代码
int listenfd=socket(PF_INET,SOCK_STREAM,0);
//绑定前,设置端口复用
int reuse=1;//只有为1这个值才表示复用
setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));
//绑定,绑定监听的套接字
struct sockaddr_in address;
address.sin_family=AF_INET;
address.sin_addr.s_addr=INADDR_ANY;
address.sin_port=htons(port);
bind(listenfd,(struct sockaddr*)&address,sizeof(address));
//监听
listen(listenfd,5);
//监听下面写epoll
//创建epoll对象,事件数组,检测到之后把事件写到数组里面
epoll_event events[MAX_EVENT_NUMBER];
int epollfd=epoll_create(5);
//将监听的文件描述符添加到epoll对象中
addfd(epollfd,listenfd,false);
http_conn::m_epollfd=epollfd;
//主线程不断循环检测有无事件发生
while(1){
int num=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);//num表示检测到了几个事件
if((num<0)&&(errno!=EINTR)){
printf("epoll failure\n");
break;
}
//循环遍历事件数组
for(int i=0;i<num;i++){
int sockfd=events[i].data.fd;
if(sockfd==listenfd){
//有客户端连接进来
//需要连接客户端,用accept
struct sockaddr_in client_address;
socklen_t client_addrlen=sizeof(client_address);
int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlen);
if(http_conn::m_user_count>=MAX_FD){
//目前的连接数满了,
//给客户端写一个信息:服务器内部正忙
close(connfd);
continue;
}
//将新的客户的数据初始化,放到数组中
users[connfd].init(connfd,client_address);
}else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
//对方异常断开或者错误等事件
users[sockfd].close_conn();
}else if(events[i].events &EPOLLIN){
if(users[sockfd].read()){
//一次性把所有数据都读完
//交给工作线程去处理
pool->append(users + sockfd);
}else{
users[sockfd].close_conn();
}
}else if(events[i].events&EPOLLOUT){
if(!users[sockfd].write()){
//一次性写完所有数据
users[sockfd].close_conn();
}
}
}
}
//先实现流程,再去实现具体的方法
close(epollfd);
close(listenfd);
delete [] users;
delete pool;
return 0;
}
threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include<pthread.h>
#include<list>
#include<exception>
#include<cstdio>
#include"locker.h"
//线程池类,定义成模板类是为了代码的复用,模板参数T是任务类
template<typename T>
class threadpool{
public:
threadpool(int thread_number=8,int max_requests=10000);
~threadpool();
bool append(T* request);
private:
static void* worker(void * arg);
void run();
private:
//线程的数量
int m_thread_number;
//线程池数组,大小为m_thread_number
pthread_t * m_threads;
//请求队列中最多允许的,等待处理的请求数量
int m_max_requests;
//请求队列
std::list< T*> m_workqueue;
//互斥锁
locker m_queuelocker;
//信号量用来判断是否有任务需要处理
sem m_queuestat;
//是否结束线程
bool m_stop;
};
template<typename T>
threadpool<T>::threadpool(int thread_number,int max_requests) :
m_thread_number(thread_number),m_max_requests(max_requests),
m_stop(false),m_threads(NULL){
if((thread_number <=0)||(max_requests<=0)){
throw std::exception();
}
m_threads=new pthread_t[m_thread_number];
if(!m_threads){
throw std::exception();
}
//创建thread_number个线程,并将它们设置为线程脱离
for(int i=0;i<thread_number;i++){
printf("create the %dth thread\n",i);
if(pthread_create(m_threads + i,NULL,worker,this)!=0){
delete [] m_threads;
throw std::exception();
}
if(pthread_detach(m_threads[i])){
delete[] m_threads;
throw std::exception();
}
}
}
template<typename T>
threadpool<T>::~threadpool(){
delete[] m_threads;
m_stop=true;
}
template<typename T>
bool threadpool<T>::append(T *request){
m_queuelocker.lock();
if(m_workqueue.size()>m_max_requests){
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template<typename T>
void* threadpool<T>::worker(void *arg){
threadpool * pool=(threadpool *)arg;
pool->run();
return pool;
}
template<typename T>
void threadpool<T>::run(){
while(!m_stop){
m_queuestat.wait();
m_queuelocker.lock();
if(m_workqueue.empty()){
m_queuelocker.unlock();
continue;
}
T* request=m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if(!request){
continue;
}
request->process();
}
}
#endif
编译并创建线程池
编译所有cpp文件
g++ *.cpp -pthread
运行该程序并设置一个端口号10000
创建了8个线程
先记一下虚拟机的ip
192.168.227.129
去浏览器中用http访问一下