最近几周笔者几篇有关陶建辉老师最新的创业项目-TdEngine代码解读文章(https://blog.csdn.net/BEYONDMA/article/details/97495298https://blog.csdn.net/BEYONDMA/article/details/96578186)出人意料的引起了巨大的反响,原以为C语言已经是昨日黄花,不过从读者的留言来看,C语言还是老当益壮,依旧有着巨大的影响力,笔者就以此为契机不断向陶老师请教,这次再给大家带来TdEngine计时器的代码解读。

其主要源码地址如下:https://github.com/taosdata/TDengine/blob/master/src/util/src/ttimer.c

    TDEngine为何要自己实现Timer

   其实一开始在读到这段代码时笔者也有类似的疑问,因为操作系统的内核基本都实现了定时器的功能,可以直接调用,但是深入思考一下就会发现由于TdEginge本身是个时序数据库的应用,而由于数据库的特殊性,其对库底层的需求其实与操作系统的内核需求类似,我们知道直接调用操作系统的timer需要在到时后启动一个对应的线程去处理对应的中断请求,而这对于TdEginge这种动辙需要上万个定时器的数据库应用来说无疑是一笔巨大的开销,这显然不是陶老师这种极端要求效率的程序员能够接受的。所以TDEngine的定时器的基本思路是基于操作系统的timer,来封装自身的定时器功能,使所有的timer控制器运行在一个线程池,而在同一timer控制器下的timer则运行在同一线程内以此来达到节约资源的目的。

TDEngine Timer的基本工作原理

简单来说TDEngine Timer的工作流程如下:

一.Timer初始化

主要完成以下工作:

1.初始化Timer线程池

2.启动操作系统的Timer,并注册处理timer循环处理函数taosTimerLoopFunc。

二.Timer启动

1.计算当前需要启动timer的调起序列号(index),即taosTimerLoopFunc运行在第几个Loop时会调起当前的timer的处理函数。

2.对于调起序列号相同的timer加入双链表tmrList[当前index],其在链表中的位置依据其到期时间的先后排序,注:(由于之前启动的操作系统timer也是有循环周期的,所以TDEngine timer也可能不是要在当前周期内调起。所以调起序列号相同,但是调起周期运可能有前后次序,其在链表中的位置其实是要买调起周期的先后排序的。)

工作过程图示:由于作者美工基础较差,所以虽然花了很久但是下图的效果其实一般,请大家能够直观感受即可。

200行代码为大家解读这个Github冠军项目背后的定时器-LMLPHP

 

     简要的讲,taosTimerLoopFunc函数循环处理tmrList,并将当前启动序列号对应的tmrList交由taosTmrProcessList处理,taosTmrProcessList调用处在当前启动序列号(index)且处在当前循环序列号(cycle)的timer的回调(handler)函数。

 结合代码的解读

1.初始化函数的解读,具体代码及注释如下:

void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) {
  static pthread_once_t tmrInit = PTHREAD_ONCE_INIT;
  tmr_ctrl_t *          pCtrl;

  pthread_once(&tmrInit, taosTmrModuleInit);//pthread_once保证TmrModule只会被运行一次

  int tmrCtrlId = taosAllocateId(tmrIdPool);

  if (tmrCtrlId < 0) {
    tmrError("%s bug!!! too many timers!!!", label);
    return NULL;
  }

  pCtrl = tmrCtrl + tmrCtrlId;
  tfree(pCtrl->tmrList);
  tmrMemPoolCleanUp(pCtrl->poolHandle);

  memset(pCtrl, 0, sizeof(tmr_ctrl_t));

  pCtrl->tmrCtrlId = tmrCtrlId;
  strcpy(pCtrl->label, label);
  pCtrl->maxNumOfTmrs = maxNumOfTmrs;

  if ((pCtrl->poolHandle = tmrMemPoolInit(maxNumOfTmrs + 10, sizeof(tmr_obj_t))) == NULL) {
    tmrError("%s failed to allocate mem pool", label);
    tmrMemPoolCleanUp(pCtrl->poolHandle);
    return NULL;
  }

  if (resolution < MSECONDS_PER_TICK) resolution = MSECONDS_PER_TICK;//初始化分辨率
  pCtrl->resolution = resolution;
  pCtrl->maxTicks = resolution / MSECONDS_PER_TICK;//初始化最大的tick
  pCtrl->ticks = rand() / pCtrl->maxTicks;
  pCtrl->numOfPeriods = longest / resolution;//初始化最大周期
  if (pCtrl->numOfPeriods < 10) pCtrl->numOfPeriods = 10;

  pCtrl->tmrList = (tmr_list_t *)malloc(sizeof(tmr_list_t) * pCtrl->numOfPeriods);//初始化tmrList目前还都是NULL
  for (int i = 0; i < pCtrl->numOfPeriods; i++) {
    pCtrl->tmrList[i].head = NULL;
    pCtrl->tmrList[i].count = 0;
  }

  if (pthread_mutex_init(&pCtrl->mutex, NULL) < 0) {
    tmrError("%s failed to create the mutex, reason:%s", label, strerror(errno));
    taosTmrCleanUp(pCtrl);
    return NULL;
  }

  pCtrl->signature = pCtrl;
  numOfTmrCtrl++;
  tmrTrace("%s timer ctrl is initialized, index:%d", label, tmrCtrlId);
  return pCtrl;//初始化完成
}

2.模块初化函数:我们看到在初始化函数中调用了模块初始化函数进行线程池及操作系统定时器的启动处理,其具体代码及注释如下:

void taosTmrModuleInit(void) {
  tmrIdPool = taosInitIdPool(maxNumOfTmrCtrl);//初始化id池
  memset(tmrCtrl, 0, sizeof(tmrCtrl));

#ifdef LINUX
  pthread_t      thread;
  pthread_attr_t tattr;
  pthread_attr_init(&tattr);
  pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
  if (pthread_create(&thread, &tattr, taosProcessAlarmSignal, NULL) != 0) {
    tmrError("failed to create timer thread");//初始化操作系统timer
    return;
  }

  pthread_attr_destroy(&tattr);
#else
  taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK);
#endif

  tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr");//初始化handle的Scheduler
  tmrTrace("timer module is initialized, thread:%d", taosTmrThreads);
}

3.timer启动函数

tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle) {
  tmr_obj_t * pObj, *cNode, *pNode;
  tmr_list_t *pList;
  int         index, period;
  tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;

  if (handle == NULL) return NULL;

  period = mseconds / pCtrl->resolution;//初始化period数
  if (pthread_mutex_lock(&pCtrl->mutex) != 0)
    tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));

  pObj = (tmr_obj_t *)tmrMemPoolMalloc(pCtrl->poolHandle);
  if (pObj == NULL) {
    tmrError("%s reach max number of timers:%d", pCtrl->label, pCtrl->maxNumOfTmrs);
    pthread_mutex_unlock(&pCtrl->mutex);
    return NULL;
  }

  pObj->cycle = period / pCtrl->numOfPeriods;//初始化周期数,即使用period除以每个循环中共有几个period,得到需要在第几个循环中调起timer
  pObj->param1 = param1;
  pObj->fp = fp;
  pObj->timerId = pObj;
  pObj->pCtrl = pCtrl;

  index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods;//初始化启动序列号
  int cindex = (pCtrl->periodsFromStart) % pCtrl->numOfPeriods;
  pList = &(pCtrl->tmrList[index]);

  pObj->index = index;
  cNode = pList->head;
  pNode = NULL;

  while (cNode != NULL) {
    if (cNode->cycle < pObj->cycle) {
      pNode = cNode;
      cNode = cNode->next;
    } else {
      break;
    }
  }

  pObj->next = cNode;
  pObj->prev = pNode;

  if (cNode != NULL) {
    cNode->prev = pObj;
  }

  if (pNode != NULL) {
    pNode->next = pObj;
  } else {
    pList->head = pObj;
  }

  pList->count++;
  pCtrl->numOfTmrs++;
  //以上为将相同index的timer按照先后顺序在pList中排序,具体下面会有图例展示。
  if (pthread_mutex_unlock(&pCtrl->mutex) != 0)
    tmrError("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno));

  tmrTrace("%s %p, timer started, fp:%p, tmr_h:%p, index:%d, total:%d cindex:%d", pCtrl->label, param1, fp, pObj, index,
           pCtrl->numOfTmrs, cindex);

  return (tmr_h)pObj;
}

      可能各位读者也被以上代码中的pobj,cnode,pnode搞的晕头转向,下面我们以3个timer为例,假如他们的index和cycle都相同,那么他们分别调用完taosTmrStart之后tmrList会是什么情况,可以参考下表。

 4.loopFunc和taosTmrProcessList

具体代码及注释如下:

void taosTmrProcessList(tmr_ctrl_t *pCtrl) {
  unsigned int index;
  tmr_list_t * pList;
  tmr_obj_t *  pObj, *header;

  pthread_mutex_lock(&pCtrl->mutex);
  index = pCtrl->periodsFromStart % pCtrl->numOfPeriods;//计算当前index
  pList = &pCtrl->tmrList[index];

  while (1) {
    header = pList->head;
    if (header == NULL) break;

    if (header->cycle > 0) {/*如当前index对应的tmrList[index]的cycle大于0则下面会将其cycle减1*/
      pObj = header;
      while (pObj) {
        pObj->cycle--;
        pObj = pObj->next;
      }
      break;
    }

    pCtrl->numOfTmrs--;
    tmrTrace("%s %p, timer expired, fp:%p, tmr_h:%p, index:%d, total:%d", pCtrl->label, header->param1, header->fp,
             header, index, pCtrl->numOfTmrs);

    pList->head = header->next;/*如运行到此处则当前header已经expired,重新整理pList*/
    if (header->next) header->next->prev = NULL;
    pList->count--;
    header->timerId = NULL;

    SSchedMsg schedMsg;
    schedMsg.fp = NULL;
    schedMsg.tfp = header->fp;
    schedMsg.ahandle = header->param1;
    schedMsg.thandle = header;
    taosScheduleTask(tmrQhandle, &schedMsg);

    tmrMemPoolFree(pCtrl->poolHandle, (char *)header);
  }

  pCtrl->periodsFromStart++;
  pthread_mutex_unlock(&pCtrl->mutex);
}
void *taosTimerLoopFunc(int signo) {
  tmr_ctrl_t *pCtrl;
  int         count = 0;

  for (int i = 1; i < maxNumOfTmrCtrl; ++i) {
    pCtrl = tmrCtrl + i;
    if (pCtrl->signature) {
      count++;
      pCtrl->ticks++;
      if (pCtrl->ticks >= pCtrl->maxTicks) {/*如当前的ticks已经大于maxTicks则需要调起taosTmrProcessList对当前index的timer进行处理*/
        taosTmrProcessList(pCtrl);
        pCtrl->ticks = 0;
      }
      if (count >= numOfTmrCtrl) break;
    }
  }

  return NULL;
}

最后我也想留一个开放性问题,也就是tmrList使用双链表实现的最大好处是什么,能否使用单链表实现?欢迎读者留言说出你的看法。

08-19 22:04