负载均衡通讯转发分发器(G5)源代码分析
(以版本v1.1.0为准)

    G5源代码文件只有.c(2400行)和.h(260行)两个源文件,行数虽然不多,但是技术密集度较高,分析源码主要从基于epoll(ET)事件处理应用层框架和转发会话结构管理两方面入手。
    先来看数据结构。主要的数据结构有服务器环境大结构,里面包含了所有参数配置和运行时状态数据,是所有内部函数传递的第一个参数。
[code=c]
/* 服务器环境大结构 */
struct ServerEnv
{
    struct CommandParam        cmd_para ;
    
    struct ForwardRule        *forward_rule ;
    unsigned long            forward_rule_count ;
    
    int                event_env ;
    struct ForwardSession        *forward_session ;
    unsigned long            forward_session_maxcount ;
    unsigned long            forward_session_count ;
    unsigned long            forward_session_use_offsetpos ;
    
    struct ServerCache        server_cache ;
} ;
[/code]
    包含的众多成员中最应关注的是转发规则结构和转发会话结构
[code=c]
/* 转发规则结构 */
struct ForwardRule
{
    char                rule_id[ RULE_ID_MAXLEN + 1 ] ;
    char                rule_mode[ RULE_MODE_MAXLEN + 1 ] ;
    
    unsigned long            timeout ;
    
    struct ClientNetAddress        client_addr[ RULE_CLIENT_MAXCOUNT ] ;
    unsigned long            client_count ;
    
    struct ForwardNetAddress    forward_addr[ RULE_FORWARD_MAXCOUNT ] ;
    unsigned long            forward_count ;
    
    struct ServerNetAddress        server_addr[ RULE_SERVER_MAXCOUNT ] ;
    unsigned long            server_count ;
    unsigned long            select_index ;
    
    union
    {
        struct
        {
            unsigned long    server_unable ;
        } RR[ RULE_SERVER_MAXCOUNT ] ;
        struct
        {
            unsigned long    server_unable ;
        } LC[ RULE_SERVER_MAXCOUNT ] ;
        struct
        {
            unsigned long    server_unable ;
            struct timeval    tv1 ;
            struct timeval    tv2 ;
            struct timeval    dtv ;
        } RT[ RULE_SERVER_MAXCOUNT ] ;
    } status ;
} ;
[/code]
    转发规则结构里的数据主要来源于配置文件,还有一些是由配置处理得到。
    最后一块数据status则是用于负载均衡的内部状态跟踪。
[code=c]
/* 转发会话结构 */
struct ForwardSession
{
    char                forward_session_type ;
    
    struct ClientNetAddress        client_addr ;
    struct ListenNetAddress        listen_addr ;
    struct ServerNetAddress        server_addr ;
    unsigned long            client_session_index ;
    unsigned long            server_session_index ;
    
    struct ForwardRule        *p_forward_rule ;
    unsigned long            client_index ;
    
    char                connect_status ;
    unsigned long            try_connect_count ;
    
    unsigned long            active_timestamp ;
    
    char                io_buffer[ IO_BUFSIZE + 1 ] ;
    long                io_buflen ;
} ;
[/code]
    转发会话结构在转发连接建立时创建,连接断开时销毁。
    一个数据传输方向对应一个会话,一般一个连接请求对应两段TCP连接也对应两个会话结构。成员forward_session_type表示是客户端到服务端数据传输方向还是服务端到客户端数据传输方向。得到一个会话结构可以通过成员client_session_index和server_session_index得到另一个会话结构。
    成员p_forward_rule指向该会话使用的转发规则结构。
    成员client_addr、listen_addr和分别对应客户端、转发端(本地侦听端)和服务端信息。
    成员connect_status用于跟踪异步连接模式下的建立状态。
    成员io_buffer用于会话数据传输时的缓冲区,便于异步处理。
    
    了解了几个主要的数据结构后我们开始分析源代码。
    
    main函数主要解析命令行参数和显示语法,调用G5函数
    
    G5函数主要是创建epoll池、装载配置文件、调用服务器主工作循环(ServerLoop函数)、销毁epoll池
    pse->event_env = epoll_create( pse->forward_session_maxcount ) ;
    nret = LoadConfig( pse ) ;
    nret = ServerLoop( pse ) ;
    close( pse->event_env );
    
    ServerLoop函数负责服务器主循环,即批量等待epoll事件,有epoll事件发生时,处理之
    sock_count = epoll_wait( pse->event_env , events , WAIT_EVENTS_COUNT , 1000 ) ;
        如果是侦听端口事件
        if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_LISTEN )
            管理端口事件,调用函数AcceptManageSocket
            if( strcmp( p_forward_session->listen_addr.rule_mode , FORWARD_RULE_MODE_G ) == 0 )
                nret = AcceptManageSocket( pse , p_event , p_forward_session ) ;
            转发端口事件,调用函数AcceptForwardSocket
            else
                nret = AcceptForwardSocket( pse , p_event , p_forward_session ) ;
        如果是输入事件
        else if( p_event->events & EPOLLIN )
            管理端口输入事件,调用函数ReceiveOrProcessManageData
            if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_MANAGE )
                nret = ReceiveOrProcessManageData( pse , p_event , p_forward_session ) ;
            转发端口输入事件,调用函数TransferSocketData
            else if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_CLIENT || p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER )
                nret = TransferSocketData( pse , p_event , p_forward_session ) ;
        如果是输出事件
        else if( p_event->events & EPOLLOUT )
            异步连接建立响应事件,调用函数SetSocketConnected
            if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER && p_forward_session->connect_status == CONNECT_STATUS_CONNECTING )
                nret = SetSocketConnected( pse , p_event , p_forward_session ) ;
            异步发送sock可写事件,调用函数ContinueToWriteSocketData
            else if( p_forward_session->connect_status == CONNECT_STATUS_CONNECTED )
                nret = ContinueToWriteSocketData( pse , p_event , p_forward_session ) ;
        如果是错误事件
        else if( p_event->events & EPOLLERR )
            调用函数ResolveSocketError
            nret = ResolveSocketError( pse , p_event , p_forward_session ) ;
    
    函数AcceptForwardSocket用于在有客户端接入时,查询转发规则,转连到对应服务端上
        循环接受转发端口连接,epoll(ET)边缘触发时必须把一口气把所有事件都处理掉,包含客户端接入事件
            接受客户端接入
            client_addr.sock = accept( p_forward_session->listen_addr.sock , (struct sockaddr *) & (client_addr.netaddr.sockaddr) , & addr_len ) ;
            查询转发规则
            nret = MatchForwardRule( pse , & client_addr , & (p_forward_session->listen_addr) , & p_forward_rule , & client_index ) ;
            连接服务端(函数ConnectToRemote)
            nret = ConnectToRemote( pse , p_event , p_forward_session , p_forward_rule , client_index , & client_addr , TRY_CONNECT_MAXCOUNT ) ;
    
    函数ConnectToRemote用于有客户端接入时转连服务端
        创建转连的本地客户端sock
        server_addr.sock = socket( AF_INET , SOCK_STREAM , IPPROTO_TCP );
        设置非堵塞模式
        SetNonBlocking( server_addr.sock );
        根据转发规则,选择目标网络地址(函数SelectServerAddress)。如果有多个服务端地址的话按负载均衡算法选择
        nret = SelectServerAddress( pse , p_client_addr , p_forward_rule , server_addr.netaddr.ip , server_addr.netaddr.port ) ;
        连接目标网络地址
        nret = connect( server_addr.sock , ( struct sockaddr *) & (server_addr.netaddr.sockaddr) , addr_len );
        如果连接建立中
        if( nret < 0 ) if( errno != EINPROGRESS )
            登记服务端转发会话到会话池中(会话连接状态为正在连接中),登记sock到epoll池
            epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_server->server_addr.sock , & server_event );
        如果连接建立完成(本地连接自己时大概率发生)
            登记客户端转发会话到会话池中(会话连接状态为连接成功),登记sock到epoll池
            epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_client->client_addr.sock , & client_event );
            登记服务端转发会话到会话池中(会话连接状态为连接成功),登记sock到epoll池
            epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_server->server_addr.sock , & server_event );
    
    函数SetSocketConnected用于之前正在连接服务端的会话当连接成功事件发生时处理
        登记客户端转发会话到会话池中(会话连接状态为连接成功),登记sock到epoll池
        p_forward_session_client->connect_status = CONNECT_STATUS_CONNECTED ;
        epoll_ctl( pse->event_env , EPOLL_CTL_ADD , p_forward_session_client->client_addr.sock , & client_event );
        更新服务端转发会话连接状态连接成功
        p_forward_session_server->connect_status = CONNECT_STATUS_CONNECTED ;
        epoll_ctl( pse->event_env , EPOLL_CTL_MOD , p_forward_session_server->server_addr.sock , & server_event );
        (等连接建立后,一对会话进入等待数据输入事件EPOLLIN)
    
    函数TransferSocketData用于从一个准备好数据输入的会话中接收数据,并转发给服务端
        循环从客户端sock中接收数据,epoll(ET)边缘触发时必须把一口气把所有事件都处理掉
            接收通讯数据
            p_out_forward_session->io_buflen = recv( in_sock , p_out_forward_session->io_buffer , IO_BUFSIZE , 0 ) ;
            如果没有接收数据了,跳出循环
            if( p_out_forward_session->io_buflen < 0 ) if( errno == EAGAIN )
            如果接收失败,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话
            epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );
            epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );
            SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );
            如果接收到输入端断开连接事件,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话
            epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );
            epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );
            SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );
            循环发送数据到服务端sock
                发送通讯数据
                len = send( out_sock , p_out_forward_session->io_buffer , p_out_forward_session->io_buflen , 0 ) ;
                如果底层发送缓冲区满了,则暂停客户端sock输入事件EPOLLIN监控,启用服务端sock输出事件EPOLLOUT监控,跳出循环发送
                if( len < 0 ) if( errno == EAGAIN )
                    epoll_ctl( pse->event_env , EPOLL_CTL_MOD , in_sock , & in_event );
                    epoll_ctl( pse->event_env , EPOLL_CTL_MOD , out_sock , & out_event );
                如果发送出错,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话
                epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );
                epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );
                SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );
                如果发送完了,跳出循环
                else if( len == p_out_forward_session->io_buflen )
                    break;
                否则继续发送未发送数据
                else
                    p_out_forward_session->io_buflen -= len ;
                    memmove( p_out_forward_session->io_buffer , p_out_forward_session->io_buffer + len , p_out_forward_session->io_buflen );
    
    函数ContinueToWriteSocketData用于处理之前底层发送缓冲区满了引发的异步发送机制
        循环继续发送未发送数据到服务端sock
            发送通讯数据
            len = send( out_sock , p_out_forward_session->io_buffer , p_out_forward_session->io_buflen , 0 ) ;
            如果底层发送缓冲区满了,跳出循环发送
            if( len < 0 ) if( errno == EAGAIN )
                break;
            如果发送出错,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话
                epoll_ctl( pse->event_env , EPOLL_CTL_DEL , in_sock , NULL );
                epoll_ctl( pse->event_env , EPOLL_CTL_DEL , out_sock , NULL );
                SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );
            如果发送完了,恢复客户端sock输入事件EPOLLIN监控,暂停服务端sock输出事件EPOLLOUT监控,跳出循环
            else if( len == p_out_forward_session->io_buflen )
                epoll_ctl( pse->event_env , EPOLL_CTL_MOD , in_sock , & in_event );
                epoll_ctl( pse->event_env , EPOLL_CTL_MOD , out_sock , & out_event );
                break;
            否则继续发送
            else
                p_out_forward_session->io_buflen -= len ;
                memmove( p_out_forward_session->io_buffer , p_out_forward_session->io_buffer + len , p_out_forward_session->io_buflen );
    
    函数ResolveSocketError用于所有sock出错时,关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话
        如果是异步连接事件
        if( p_forward_session->forward_session_type == FORWARD_SESSION_TYPE_SERVER && p_forward_session->connect_status == CONNECT_STATUS_CONNECTING )
            设置之前选择目标服务器不可用状态
            nret = OnServerUnable( pse , p_forward_session->p_forward_rule ) ;
            选择新目标服务器,连接之
            nret = ConnectToRemote( pse , p_event , p_forward_session , p_forward_session->p_forward_rule , p_forward_session->client_index , & (p_forward_session->client_addr) , --p_forward_session->try_connect_count ) ;
            清理之前临时创建的服务端会话结构
            epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );
            SetForwardSessionUnitUnused( p_forward_session );
        如果是数据交换报错事件
            关闭客户端和服务端sock、删除epoll池中一对sock,删除转发会话
            epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->client_addr.sock , NULL );
            epoll_ctl( pse->event_env , EPOLL_CTL_DEL , p_forward_session->server_addr.sock , NULL );
            SetForwardSessionUnitUnused2( & (pse->forward_session[p_forward_session->client_session_index]) , & (pse->forward_session[p_forward_session->server_session_index]) );
    
    epoll(ET)事件处理应用层框架源代码主线分析完成,希望通过分析其源代码,可以帮助读者理解其设计思路和代码结构,审核作者编码缺陷,提高使用准确性,也有助于读者在许可证允许条件下fork自己的版本。
    如有意见或建议欢迎及时联系我
    开源项目首页 : http://git.oschina.net/calvinwilliams/G5
    作者邮箱 : [email protected]

05-05 02:34