http_conn.h

#ifndef HTTPCONNECTION_H
#define HTTPCONNEVTION_H
#include<sys/epoll.h>
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<signal.h>
#include<sys/types.h>
#include<fcntl.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/stat.h>
#include<sys/mman.h>
#include<stdarg.h>
#include<errno.h>
#include"locker.h"
#include<sys/uio.h>

//解析HTTP请求报文
class http_conn{
public:
    static int m_epollfd; //所有的socket上的事件都被注册到同一个epoll对象中,所有设置为静态的
    static int m_user_count; //统计用户的数量

    http_conn(){}
    ~http_conn(){}

    void process(); //处理客户端的请求
    void init(int sockfd,const sockaddr_in &addr);//初始化新接收的连接
    void close_conn(); //关闭连接
    bool read(); //非阻塞的读
    bool write(); //非阻塞的写

private:
    int m_sockfd;//该HTTP连接的socket
    sockaddr_in m_address;//通信的socket地址
};



#endif

http_conn.cpp

#include"http_conn.h"

int http_conn::m_epollfd=-1; 
int http_conn::m_user_count=0;
//设置文件描述符非阻塞
int setnonblocking(int fd){
    int old_flag=fcntl(fd,F_GETFL);
    int new_flag=old_flag | O_NONBLOCK;
    fcntl(fd,F_SETFL,new_flag);
}

//添加文件描述符到epoll中
//向epoll中添加需要监听的文件描述符
void addfd(int epollfd,int fd,bool one_shot){
    epoll_event event;
    event.data.fd=fd;
    event.events=EPOLLIN | EPOLLRDHUP;
    if(one_shot){
        event.events | EPOLLONESHOT;
    }
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
    //设置文件描述符非阻塞
    setnonblocking(fd);

}

//从epoll中删除文件描述符
void removefd(int epollfd,int fd){
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0);
    close(fd);
}

//修改文件描述符,重置socket上EPOLLONESHOT事件,以确保下一次可读时,EPOLLIN事件能被触发
void modfd(int epollfd,int fd,int ev){
    epoll_event event;
    event.data.fd=fd;
    event.events=ev | EPOLLONESHOT | EPOLLRDHUP;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&event);
}

//初始化新接收的连接
//前面要加http_conn::,要加上类名
void http_conn::init(int sockfd,const sockaddr_in &addr){
    m_sockfd=sockfd;
    m_address=addr;

    //设置一下端口复用,设置了才能加到epoll中
    int reuse=1;//只有为1这个值才表示复用
    setsockopt(m_sockfd,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));

    //添加到epoll对象中
    addfd(m_epollfd,sockfd,true);
    m_user_count++;//总用户数加1
}

//关闭连接
void http_conn::close_conn(){
    if(m_sockfd!=-1){
        removefd(m_epollfd,m_sockfd);
        m_sockfd=-1;
        m_user_count--;//关闭一个连接,客户总数量减1
    }
}


bool http_conn::read(){
    printf("一次性读完数据\n");
    return true;
}

bool http_conn::write(){
    printf("一次性写完数据\n");
    return true;
}

//由线程池中的工作线程调用,这是处理HTTP请求的入口函数
void http_conn::process(){

    //解析HTTP请求,做业务逻辑

    printf("parse request,create response");

    //生成响应
}

locker.h

#ifndef LOCKER_H
#define LOCKER_H

#include<pthread.h>
#include<exception>
#include<semaphore.h>

//线程同步机制封装类

//互斥锁类
class locker{
public:
    locker(){
        if(pthread_mutex_init(&m_mutex,NULL)!=0){
            throw std::exception();
        }
    }

    ~locker(){
        pthread_mutex_destroy(&m_mutex);
    }

    bool lock(){
        return pthread_mutex_lock(&m_mutex)==0;
    }

    bool unlock(){
        return pthread_mutex_unlock(&m_mutex)==0;
    }

    pthread_mutex_t * get(){
        return &m_mutex;
    }
private:
    pthread_mutex_t m_mutex;
};

//条件变量类
class cond{
public:
    cond(){
        if(pthread_cond_init(&m_cond,NULL)!=0){
            throw std::exception();
        }
    }

    ~cond(){
        pthread_cond_destroy(&m_cond);
    }

    bool wait(pthread_mutex_t * mutex){
        return pthread_cond_wait(&m_cond,mutex)==0;
    }

    bool timewait(pthread_mutex_t * mutex,struct timespec t){
        return pthread_cond_timedwait(&m_cond,mutex,&t)==0;
    }

    bool signal(){
        return pthread_cond_signal(&m_cond)==0;
    }

    bool broadcast(){
        return pthread_cond_broadcast(&m_cond)==0;
    }
private:
    pthread_cond_t m_cond;
};

//信号量类
class sem{
public:
    sem(){
        if(sem_init(&m_sem,0,0)!=0){
            throw std::exception();
        }
    }

    sem(int num){
        if(sem_init(&m_sem,0,num)!=0){
            throw std::exception();
        }
    }

    ~sem(){
        sem_destroy(&m_sem);
    }

    //等待信号量
    bool wait(){
        return sem_wait(&m_sem)==0;
    }

    //增加信号量
    bool post(){
        return sem_post(&m_sem)==0;
    }
private:
    sem_t m_sem;
};
#endif

main.cpp

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<error.h>
#include<fcntl.h>
#include<sys/epoll.h>
#include<signal.h>
#include"locker.h"
#include"threadpool.h"
#include"http_conn.h"

#define MAX_FD 65535 //最大文件描述符个数
//虽然最大有这么多,但是跑起来支持不了这么多并发
#define MAX_EVENT_NUMBER 10000 //监听的最大的事件数量,过少的话程序性能不行

//添加信号捕捉
void addsig(int sig,void(handler)(int)){
    struct sigaction sa;
    memset(&sa,'\0',sizeof(sa));
    sa.sa_handler=handler;
    sigfillset(&sa.sa_mask);
    sigaction(sig,&sa,NULL);
}

//添加文件描述符到epoll中
extern void addfd(int epollfd,int fd,bool one_shot);

//从epoll中删除文件描述符
extern void removefd(int epollfd,int fd);

//修改文件描述符
extern void modfd(int epollfd,int fd,int ev);

int main(int argc,char* argv[]){

    if(argc<=1){
        printf("按照如下格式运行:%s port_number\n",basename(argv[0]));
        exit(-1);
    }

    //获取端口号
    int port = atoi(argv[1]);

    //对SIGPIE信号进行处理
    addsig(SIGPIPE,SIG_IGN);

    //创建线程池初始化信息
    threadpool<http_conn> * pool=NULL;
    try{
        pool=new threadpool<http_conn>;
    } catch(...){
        exit(-1);
    }//线程池都没创建好的话,直接退出
    
    //当前main函数是主线程,任务类是http_conn这个类
    //创建一个数组用于保存所有的客户端信息
    http_conn * users=new http_conn[ MAX_FD ];
    
    //写网络代码
    int listenfd=socket(PF_INET,SOCK_STREAM,0);

    //绑定前,设置端口复用
    int reuse=1;//只有为1这个值才表示复用
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));

    //绑定,绑定监听的套接字
    struct sockaddr_in address;
    address.sin_family=AF_INET;
    address.sin_addr.s_addr=INADDR_ANY;
    address.sin_port=htons(port);

    bind(listenfd,(struct sockaddr*)&address,sizeof(address));
    //监听
    listen(listenfd,5);

    //监听下面写epoll
    //创建epoll对象,事件数组,检测到之后把事件写到数组里面
    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd=epoll_create(5);

    //将监听的文件描述符添加到epoll对象中
    addfd(epollfd,listenfd,false);
    http_conn::m_epollfd=epollfd;

    //主线程不断循环检测有无事件发生
    while(1){
        int num=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);//num表示检测到了几个事件
        if((num<0)&&(errno!=EINTR)){
            printf("epoll failure\n");
            break;
        }

        //循环遍历事件数组
        for(int i=0;i<num;i++){
            int sockfd=events[i].data.fd;
            if(sockfd==listenfd){
                //有客户端连接进来
                //需要连接客户端,用accept
                struct sockaddr_in client_address;
                socklen_t client_addrlen=sizeof(client_address);

                int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlen);

                if(http_conn::m_user_count>=MAX_FD){
                    //目前的连接数满了,
                    //给客户端写一个信息:服务器内部正忙
                    close(connfd);
                    continue;
                }
                //将新的客户的数据初始化,放到数组中
                users[connfd].init(connfd,client_address);
            }else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
                //对方异常断开或者错误等事件
                users[sockfd].close_conn();
            }else if(events[i].events &EPOLLIN){
                if(users[sockfd].read()){
                    //一次性把所有数据都读完
                    //交给工作线程去处理
                    pool->append(users + sockfd);
                }else{
                    users[sockfd].close_conn();
                }
            }else if(events[i].events&EPOLLOUT){
                if(!users[sockfd].write()){
                    //一次性写完所有数据
                    users[sockfd].close_conn();
                }
            }



        }
    }
    //先实现流程,再去实现具体的方法
    close(epollfd);
    close(listenfd);
    delete [] users;
    delete pool;

    return 0;
}

threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include<pthread.h>
#include<list>
#include<exception>
#include<cstdio>
#include"locker.h"

//线程池类,定义成模板类是为了代码的复用,模板参数T是任务类
template<typename T>
class threadpool{
public:
    threadpool(int thread_number=8,int max_requests=10000);
    ~threadpool();
    bool append(T* request);

private:
    static void* worker(void * arg);
    void run();

private:
    //线程的数量
    int m_thread_number;

    //线程池数组,大小为m_thread_number
    pthread_t * m_threads;

    //请求队列中最多允许的,等待处理的请求数量
    int m_max_requests;

    //请求队列
    std::list< T*> m_workqueue;

    //互斥锁
    locker m_queuelocker;

    //信号量用来判断是否有任务需要处理
    sem m_queuestat;

    //是否结束线程
    bool m_stop;
};

template<typename T>
threadpool<T>::threadpool(int thread_number,int max_requests) :
    m_thread_number(thread_number),m_max_requests(max_requests),
    m_stop(false),m_threads(NULL){

    if((thread_number <=0)||(max_requests<=0)){
        throw std::exception();
    }

    m_threads=new pthread_t[m_thread_number];
    if(!m_threads){
        throw std::exception();
    }

    //创建thread_number个线程,并将它们设置为线程脱离
    for(int i=0;i<thread_number;i++){
        printf("create the %dth thread\n",i);

        if(pthread_create(m_threads + i,NULL,worker,this)!=0){
            delete [] m_threads;
            throw std::exception();
        }

        if(pthread_detach(m_threads[i])){
            delete[] m_threads;
            throw std::exception();
        }
    }
    }

template<typename T>
threadpool<T>::~threadpool(){
    delete[] m_threads;
    m_stop=true;
}

template<typename T>
bool threadpool<T>::append(T *request){
    m_queuelocker.lock();
    if(m_workqueue.size()>m_max_requests){
        m_queuelocker.unlock();
        return false;
    }

    m_workqueue.push_back(request);
    m_queuelocker.unlock();
    m_queuestat.post();
    return true;
}

template<typename T>
void* threadpool<T>::worker(void *arg){

    threadpool * pool=(threadpool *)arg;
    pool->run();
    return pool;
}

template<typename T>
void threadpool<T>::run(){
    while(!m_stop){
        m_queuestat.wait();
        m_queuelocker.lock();
        if(m_workqueue.empty()){
            m_queuelocker.unlock();
            continue;
        } 
        T* request=m_workqueue.front();
        m_workqueue.pop_front();
        m_queuelocker.unlock();

        if(!request){
            continue;
        }
        request->process();
    }
}
#endif

编译并创建线程池

编译所有cpp文件

g++ *.cpp -pthread

运行该程序并设置一个端口号10000
Webserver(5.4)项目整体-LMLPHP
创建了8个线程
Webserver(5.4)项目整体-LMLPHP

先记一下虚拟机的ip
192.168.227.129
去浏览器中用http访问一下
Webserver(5.4)项目整体-LMLPHP
Webserver(5.4)项目整体-LMLPHP

11-09 14:44