最近开始看Tair的源码实现,Tair的通信使用的是淘宝的开源的网络库tbnet实现。具体来说是依靠tbnet::Transport类型实现,其源代码路径如下:
http://code.taobao.org/svn/tb-common-utils/trunk/tbnet/src

下面是Transport的简单类图:

          浅析淘宝网络通信库tbnet的实现-LMLPHP

下面介绍其通信流程:

1. 启动
Transport::start()完成其启动,主要工作是启动了两个线程:_readWriteThread和_timeoutThread. 这两个线程的实际入口函数式Tranport::run(), 下面是Transport::run的实现:

点击(此处)折叠或打开

  1. void Transport::run(tbsys::CThread *thread, void *arg) {
  2.     if (thread == &_timeoutThread) {
  3.         timeoutLoop();
  4.     } else {
  5.         eventLoop((SocketEvent*)arg);
  6.     }
  7. }
    A. 读写线程:
        1. 读写线程使用epoll实现,在Transport中存在一个EPollSocketEvent _socketEvent 成员变量, arg传入的是这个成员变量的指针,EPollSocketEvent是epoll个操作的封装,在EPollSocketEvent的构造函数中会调用epoll_create初始化epoll。
        2. eventLoop的实现步骤如下:
                a.  调用socketEvent->getEvents() 取得发生的事件并放入到ioevents数组中
                        i.  调用epoll_wait等待读写事件;
                        ii. 每个事件的events[i].data.ptr存放有事件对应的IOComonet(socket 封装),可以用来处理读写事件;
                b.  对于读写事件,分别调用ioc->handleReadEvent()和ioc->handleWriteEvent()处理;
    
    B. timeout线程
        1. 主要用于检查通信的socket是否超过一定时常没有使用,对于TCPComponent(封装通信socket),如果15分钟没有使用,则断开连接;

2. 监听
通过调用Transport::listen()完成监听, 主要完成以下步骤:
    a.  创建TCPAcceptor,继承于IOComonet(socket 封装),并启动异步监听;
    b. 调用addComponent(acceptor, true, false);
            i.  创建epoll_event ev,设置ev.data.ptr = socket->getIOComponent(),用来处理读写事件的IOComponentsocket 封装)
            ii. 调用epoll_ctl(_iepfd,EPOLL_CTL_ADD,socket->getSocketHandle(),&ev)注册监听socket上的读取事件;
    c. 当客户端有connect请求过来的时候会触发监听socket上的读取事件,TCPAcceptor::handleReadEvent()会被调用。

3. 接受客户端的连接请求
系统在TCPAcceptor::handleReadEvent()中接受客户端的连接请求,主要步骤如下:
    a. socket = ((ServerSocket*)_socket)->accept() 接受连接请求并得到通信socket;
    b. 创建TCPComponent封装通信socket;
    c. 调用addComponent(component, true, false) 注册当前socket的读取事件。

4. 数据通信
    A. 数据读取
         数据通信由通信socket完成,该socket在epoll中注册,当有数据需要读取的时候会触发读取事件并调用TCPComponent::handleReadEvent()处理。
    B. 数据发送
         当有数据需要发送时Connection::postPacket()函数, 这个函数中会调用_iocomponent->enableWrite(true),注册写入事件,并调用TCPComponent::handleWriteEvent()处理。
12-17 00:38