池化技术

在系统开发过程中,我们经常会用到池化技术。通俗的讲,池化技术就是:把一些资源预先分配好,组织到对象池中,之后的业务使用资源从对象池中获取,使用完后放回到对象池中。这样做带来几个明显的好处:

1),资源重复使用, 减少了资源分配和释放过程中的系统消耗。比如,在IO密集型的服务器上,并发处理过程中的子线程或子进程的创建和销毁过程,带来的系统开销将是难以接受的。所以在业务实现上,通常把一些资源预先分配好,如线程池,数据库连接池,Redis连接池,HTTP连接池等,来减少系统消耗,提升系统性能。

2),可以对资源的整体使用做限制。这个好理解,相关资源预分配且只在预分配是生成,后续不再动态添加,从而限制了整个系统对资源的使用上限。类似一个令牌桶的功能。

3),池化技术分配对象池,通常会集中分配,这样有效避免了碎片化的问题。

线程池

有了上面的池化技术铺垫,理解线程池就很容易了:

1)先启动若干数量的线程,并让这些线程都处于睡眠状态,并以一定的方式组织起来,形成“线程池”;

2)当客户端有一个新请求时,就会从线程池中选取一个线程,并唤醒线程,让它来处理客户端的这个请求;

3)当处理完这个请求后,线程又处于睡眠状态,并且放回到线程池中,等待任务到来调度。

线程池设计

池化技术与线程池的简单实现-LMLPHP

 如上图,是基于线程条件变量唤醒的简单线程池模型,主要包括了:任务,任务队列,工作线程几个组件。说明如下:

任务(job

任务是对一次可执行过的描述,通常包括执行的方法(执行函数)和数据。比如,一个报文接收过程就是一个任务,包括:报文处理函数和报文内容。本例中,对job的定义如下:

点击(此处)折叠或打开

  1. typedef struct thread_job {
  2.     struct list_head list;
  3.     struct thread_pool* pool;
  4.     job_process_func jfunc; // 任务执行函数
  5.     void* data; // 任务数据,由调用者分配,框架释放
  6. } job_t

任务队列(job_queue

任务队列缓存着所有待执行的任务,供各个工作线程调度。当工作线程空闲或被唤醒时,会尝试从任务队列中摘取一个任务,然后在线程空间内执行。任务队列用线程互斥锁进行保护。为了方便理解整个任务执行转换的过程,本例中,把空闲任务队列单独出来了。 所以,任务的状态有四种: 
    1)在空闲对列中等待业务;
    2)在业务流程中装配;
    3)在工作队列中等待调度;

    4)在工作线程中执行。

工作线程

     工作线程等待在条件变量上(job_dequeue_valid,新任务需要处理),当新任务到达时,线程被唤醒。线程会尝试从任务队列上获取任务,并执行它;执行完毕后重新等待在条件变量上。如此往复:

点击(此处)折叠或打开

  1. while (1)
  2.     {
  3.         if (thrdp_stopping(pool))
  4.         {
  5.             pthread_exit(NULL);
  6.         }

  7.         pthread_mutex_lock(&(pool->qlock));
  8.         while (list_empty(&pool->job_queue))
  9.         {
  10.             // nothing to process, wait until job come in
  11.             printf("thread get none job, hangup...\n", (unsigned int)pthread_self() );
  12.             pthread_cond_wait(&(pool->job_dequeue_valid), &(pool->qlock));

  13.             if (thrdp_stopping(pool))
  14.             {
  15.                 pthread_exit(NULL);
  16.             }

  17.         }
  18.         job = list_first_entry(&pool->job_queue, struct thread_job, list);
  19.         list_del(&job->list);
  20.         pthread_mutex_unlock(&(pool->qlock));
  21.         job->jfunc(job);
  22.     }

如上代码片段,特别提醒一下,当工作线程从等待条件中被唤醒时,一定要对当前的执行条件再检查,原因是当前线程可能是被“虚假唤醒”。

比如,当前队列是空的,有ABC三个工作线程都在等待任务;当一个任务R0到来时,A,B,C三个线程均可能被唤醒;由于任务队列锁的关系,其中只有一个(假如A)取得的实际任务,而BC均未取得任务,因此BC称为被“虚假唤醒”。此时,BC必须对任务获取状态进行再次判断,确保正常获取到任务,方能往下执行调度,否则,应该重新休眠等待任务。


代码实现

附完整代码和测试代码threadpool.h


点击(此处)折叠或打开

  1. #ifndef __THREAD_POOL_H__
  2. #define __THREAD_POOL_H__
  3. #include <linux/types.h>
  4. #include <pthread.h>
  5. #include <syslog.h>
  6. #include <sys/stat.h>
  7. #include <unistd.h>
  8. #include <stdio.h>
  9. #include <stdlib.h>
  10. #include <sys/socket.h>
  11. #include <sys/types.h>
  12. #include <string.h>
  13. #include <asm/types.h>
  14. #include <linux/netlink.h>
  15. #include <linux/socket.h>
  16. #include <linux/types.h>
  17. #include <sys/socket.h>
  18. #include <netinet/in.h>
  19. #include <arpa/inet.h>
  20. #include <errno.h>
  21. #include <unistd.h>

  22. #include "list.h"


  23. #define THRDP_STATUS_STOPPING 0x01 // 线程池是否已经关闭

  24. typedef void (*job_process_func)(void*);

  25. struct thread_pool
  26. {
  27.     int thread_num; // 线程池中开启线程的个数
  28.     int max_job_num; // 队列中最大job的个数
  29.     struct list_head job_queue; // 待处理的job队列
  30.     struct list_head free_job_queue; // job空闲池


  31.     pthread_t *pthreads; //线程池中所有线程的pthread_t
  32.     pthread_mutex_t qlock; //互斥信号量,保护job_queue
  33.     pthread_mutex_t fqlock; //互斥信号量,保护free_job_queue
  34.     pthread_cond_t job_dequeue_valid; // 隊列不為空,需要喚醒線程處理
  35.     pthread_cond_t job_enqueue_valid; // 隊列沒有滿,可以繼續添加job

  36.     int flags;
  37. };

  38. typedef struct thread_job {
  39.     struct list_head list;
  40.     struct thread_pool* pool;
  41.     job_process_func jfunc; // 任务执行函数
  42.     void* data; // 任务数据,由调用者分配,框架释放
  43. } job_t;

  44. struct thread_pool* THRDP_new(unsigned int thread_num, unsigned int max_job_num);
  45. void THRDP_destroy(struct thread_pool* pool);
  46. struct thread_job* THRDP_job_get(struct thread_pool* pool);
  47. void THRDP_job_put(struct thread_pool* pool, struct thread_job* job);
  48. void THRDP_job_add(struct thread_pool* pool, struct thread_job* job);

  49. #endif

    注意,上面引用了list.h,请查看我之前发的用户态list的头文件。
    threadpool.c

点击(此处)折叠或打开

  1. #include "threadpool.h"


  2. static inline int thrdp_stopping (struct thread_pool* pool)
  3. {
  4.     return pool->flags & THRDP_STATUS_STOPPING;
  5. }

  6. static void* thrdp_process_func(void* arg)
  7. {
  8.     struct thread_pool* pool = (struct thread_pool*)arg;
  9.     struct thread_job* job;

  10.     while (1)
  11.     {
  12.         if (thrdp_stopping(pool))
  13.         {
  14.             pthread_exit(NULL);
  15.         }

  16.         pthread_mutex_lock(&(pool->qlock));
  17.         while (list_empty(&pool->job_queue))
  18.         {
  19.             // nothing to process, wait until job come in
  20.             printf("thread get none job, hangup...\n", (unsigned int)pthread_self() );
  21.             pthread_cond_wait(&(pool->job_dequeue_valid), &(pool->qlock));

  22.             if (thrdp_stopping(pool))
  23.             {
  24.                 pthread_exit(NULL);
  25.             }

  26.         }
  27.         job = list_first_entry(&pool->job_queue, struct thread_job, list);
  28.         list_del(&job->list);
  29.         pthread_mutex_unlock(&(pool->qlock));
  30.         job->jfunc(job);
  31.     }
  32. }


  33. struct thread_pool* THRDP_new(unsigned int thread_num, unsigned int max_job_num)
  34. {
  35.     struct thread_pool* pool = NULL;
  36.     struct thread_job* job;
  37.     struct thread_job* tmp;
  38.     int i;

  39.     if(thread_num < 2)
  40.     {
  41.         thread_num = 2;
  42.     }

  43.     if (max_job_num < thread_num)
  44.     {
  45.         max_job_num = thread_num;
  46.     }

  47.     pool = malloc(sizeof(struct thread_pool));
  48.     if (NULL == pool)
  49.     {
  50.         printf("failed to malloc thread_pool!\n");
  51.         return NULL;
  52.     }
  53.     memset(pool, 0, sizeof(struct thread_pool) );
  54.     pool->thread_num = thread_num;
  55.     pool->max_job_num = max_job_num;

  56.     INIT_LIST_HEAD(&pool->job_queue);
  57.     INIT_LIST_HEAD(&pool->free_job_queue);

  58.     if (pthread_mutex_init(&(pool->qlock), NULL))
  59.     {
  60.         printf("failed to init job queue mutex lock!\n");
  61.         goto ERROR_OUT;
  62.     }
  63.     if (pthread_mutex_init(&(pool->fqlock), NULL))
  64.     {
  65.         printf("failed to init free job queue mutex lock!\n");
  66.         goto ERROR_OUT1;
  67.     }
  68.     if (pthread_cond_init(&(pool->job_dequeue_valid), NULL))
  69.     {
  70.         printf("failed to init cond job_dequeue_valid!\n");
  71.         goto ERROR_OUT2;
  72.     }
  73.     if (pthread_cond_init(&(pool->job_enqueue_valid), NULL))
  74.     {
  75.         printf("failed to init cond job_enqueue_valid!\n");
  76.         goto ERROR_OUT3;
  77.     }

  78.     for (i = 0; i < max_job_num; i++)
  79.     {
  80.         job = (struct thread_job*)malloc(sizeof(struct thread_job));
  81.         if (NULL == job)
  82.         {
  83.             goto ERROR_OUT4;
  84.         }
  85.         memset(job, 0, sizeof(struct thread_job));
  86.         list_add(&job->list, &pool->free_job_queue);
  87.     }

  88.     pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
  89.     if (NULL == pool->pthreads)
  90.     {
  91.         printf("failed to malloc pthreads!\n");
  92.         goto ERROR_OUT4;
  93.     }
  94.     memset(pool->pthreads, 0, sizeof(pthread_t) * thread_num);


  95.     for (i = 0; i < pool->thread_num; ++i)
  96.     {
  97.         pthread_create(&(pool->pthreads[i]), NULL, thrdp_process_func, (void*)pool);
  98.     }

  99.     printf("create pool done\n");
  100.     return pool;

  101. ERROR_OUT4:
  102.     if (!list_empty(&pool->free_job_queue))
  103.     {
  104.         list_for_each_entry_safe(job, tmp, &pool->free_job_queue, list)
  105.         {
  106.             list_del(&job->list);
  107.             free(job);
  108.         }
  109.     }
  110.     pthread_cond_destroy(&(pool->job_enqueue_valid));
  111. ERROR_OUT3:
  112.     pthread_cond_destroy(&(pool->job_dequeue_valid));
  113. ERROR_OUT2:
  114.     pthread_mutex_destroy(&(pool->fqlock));
  115. ERROR_OUT1:
  116.     pthread_mutex_destroy(&(pool->qlock));
  117. ERROR_OUT:
  118.     free(pool);
  119.     return NULL;

  120. }

  121. void THRDP_destroy(struct thread_pool* pool)
  122. {
  123.     struct thread_job* job;
  124.     struct thread_job* tmp;
  125.     int i;

  126.     if (!pool)
  127.     {
  128.         return;
  129.     }
  130.     pthread_mutex_lock(&(pool->qlock));
  131.     if (thrdp_stopping(pool)) //线程池已经退出了,就直接返回
  132.     {
  133.         pthread_mutex_unlock(&(pool->qlock));
  134.         return;
  135.     }

  136.     // 設置stop標記,喚醒阻塞的線程,使其退出
  137.     pool->flags |= THRDP_STATUS_STOPPING;
  138.     pthread_mutex_unlock(&(pool->qlock));

  139.     pthread_cond_broadcast(&(pool->job_dequeue_valid)); //唤醒线程池中正在阻塞的线程
  140.     pthread_cond_broadcast(&(pool->job_enqueue_valid)); //唤醒添加任务的threadpool_add_job函数

  141.     for (i = 0; i < pool->thread_num; ++i)
  142.     {
  143.         pthread_join(pool->pthreads[i], NULL); //等待线程池的所有线程执行完毕
  144.     }
  145.     free(pool->pthreads);

  146.     // all threads had done, no one using jobs, just free them
  147.     if (!list_empty(&pool->job_queue))
  148.     {
  149.         list_for_each_entry_safe(job, tmp, &pool->job_queue, list)
  150.         {
  151.             list_del(&job->list);
  152.             free(job);
  153.         }
  154.     }

  155.     if (!list_empty(&pool->free_job_queue))
  156.     {
  157.         list_for_each_entry_safe(job, tmp, &pool->free_job_queue, list)
  158.         {
  159.             list_del(&job->list);
  160.             free(job);
  161.         }
  162.     }

  163.     pthread_mutex_destroy(&(pool->qlock));
  164.     pthread_mutex_destroy(&(pool->fqlock));
  165.     pthread_cond_destroy(&(pool->job_dequeue_valid));
  166.     pthread_cond_destroy(&(pool->job_enqueue_valid));
  167. }

  168. /**
  169. * get a free JOB
  170. * if all jobs busy, wait util someone be free
  171. */
  172. struct thread_job* THRDP_job_get(struct thread_pool* pool)
  173. {
  174.     struct thread_job* job;

  175.     if (!pool || thrdp_stopping(pool))
  176.     {
  177.         return NULL;
  178.     }

  179.     pthread_mutex_lock(&(pool->fqlock));
  180.     while (list_empty(&pool->free_job_queue))
  181.     {
  182.         pthread_cond_wait(&(pool->job_enqueue_valid), &(pool->fqlock));
  183.         if (thrdp_stopping(pool))
  184.         {
  185.             pthread_mutex_unlock(&(pool->fqlock));
  186.             return NULL;
  187.         }

  188.     }
  189.     job = list_first_entry(&pool->free_job_queue, struct thread_job, list);
  190.     list_del(&job->list);
  191.     pthread_mutex_unlock(&(pool->fqlock));
  192.     job->pool = pool;
  193.     return job;
  194. }


  195. /**
  196. * put back a used job back to free_job_queue
  197. *
  198. * this will free the job data which MUST allocated dynamically
  199. */
  200. void THRDP_job_put(struct thread_pool* pool, struct thread_job* job)
  201. {
  202.     int notify = 0;
  203.     if (!pool || !job )
  204.     {
  205.         return;
  206.     }
  207.     if (job->data)
  208.     {
  209.         free(job->data);
  210.         job->data = NULL;
  211.     }

  212.     memset(job, 0, sizeof(struct thread_job));

  213.     pthread_mutex_lock(&(pool->fqlock));
  214.     notify = list_empty(&pool->free_job_queue);
  215.     list_add_tail(&job->list, &pool->free_job_queue);
  216.     pthread_mutex_unlock(&(pool->fqlock));

  217.     if (notify)
  218.     {
  219.         printf("put back %p to free queue, notify wait jos tasks\n", job);
  220.         pthread_cond_broadcast(&(pool->job_enqueue_valid));
  221.     }
  222. }

  223. /**
  224. * add a job to thread-pool for schedule
  225. */
  226. void THRDP_job_add(struct thread_pool* pool, struct thread_job* job)
  227. {
  228.     int notify = 0;

  229.     if (!pool || !job )
  230.     {
  231.         return;
  232.     }

  233.     pthread_mutex_lock(&(pool->qlock));
  234.     notify = list_empty(&pool->job_queue);
  235.     list_add_tail(&job->list, &pool->job_queue);
  236.     pthread_mutex_unlock(&(pool->qlock));

  237.     if (notify)
  238.     {
  239.         // notify somebody to handle the new job
  240.         printf("add new job, notify someone to handle it\n");
  241.         pthread_cond_broadcast(&(pool->job_dequeue_valid));
  242.     }
  243. }

测试文件thrdp_test.c


点击(此处)折叠或打开

  1. #include <stdio.h>

  2. #include <string.h>
  3. #include <stdlib.h>
  4. #include <syslog.h>
  5. #include <sys/socket.h>
  6. #include <sys/types.h>
  7. #include <signal.h>
  8. #include <getopt.h>
  9. #include <netinet/in.h>
  10. #include <arpa/inet.h>
  11. #include <string.h>
  12. #include <sys/select.h>
  13. #include <sys/un.h>
  14. #include <stddef.h>
  15. #include <unistd.h>
  16. #include <errno.h>
  17. #include <sys/stat.h>
  18. #include <fcntl.h>
  19. #include <sys/wait.h>
  20. #include <unistd.h>
  21. #include <uuid/uuid.h>
  22. #include <sys/time.h>

  23. #include "threadpool.h"

  24. #define SRV_PORT 9966
  25. #define UDP_PKG_MAX_LEN 4096

  26. struct pkg_desc
  27. {
  28.     int srvfd;
  29.     struct sockaddr_in cliaddr;
  30.     char data[UDP_PKG_MAX_LEN];
  31. };


  32. void pkg_process(void* job)
  33. {
  34.     struct thread_job* pjob = (struct thread_job*) job;
  35.     struct pkg_desc* ppkg = (struct pkg_desc*) pjob->data;

  36.     printf("************* process in %u ******************\n", (unsigned int)pthread_self());
  37.     printf("content:: %s\n", ppkg->data);
  38.     sendto(ppkg->srvfd, ppkg->data, strlen(ppkg->data), 0, (struct sockaddr*)&ppkg->cliaddr, sizeof(ppkg->cliaddr));
  39.     THRDP_job_put(pjob->pool, pjob);
  40. }


  41. void start_server(void)
  42. {
  43.     struct sockaddr_in srvin, cliin;
  44.     socklen_t addrlen;
  45.     int cmd_mgmt_fd;
  46.     struct thread_pool* pool = NULL;


  47.     cmd_mgmt_fd = socket(AF_INET, SOCK_DGRAM, 0);
  48.     if(cmd_mgmt_fd < 0)
  49.     {
  50.         printf("start_cmd_mgmt_process socket fd failed.\n");
  51.         return;
  52.     }

  53.     memset(&srvin, 0, sizeof(struct sockaddr_in));
  54.     srvin.sin_family = AF_INET;
  55.     srvin.sin_addr.s_addr = inet_addr("127.0.0.1");
  56.     srvin.sin_port = htons(SRV_PORT);

  57.     if(bind(cmd_mgmt_fd, (struct sockaddr*)&srvin, sizeof(struct sockaddr_in)) < 0)
  58.     {
  59.         printf("bind error");
  60.     }


  61.     if ((pool = THRDP_new(2, 8)) == NULL)
  62.     {
  63.         printf("create thread pool error\n");
  64.         return;
  65.     }

  66.     while(1)
  67.     {
  68.         struct pkg_desc* ppkg = NULL;
  69.         struct thread_job* job = THRDP_job_get(pool);
  70.         int count;

  71.         if (NULL == job)
  72.         {
  73.             printf("get thread process task error\n");
  74.             return;
  75.         }
  76.         ppkg = (struct pkg_desc*) malloc(sizeof(struct pkg_desc));

  77.         if (NULL == ppkg)
  78.         {
  79.             printf("malloc pkg error\n");
  80.             return;
  81.         }
  82.         memset(ppkg, 0, sizeof(struct pkg_desc));


  83.         addrlen = sizeof(struct sockaddr_in);
  84.         count = recvfrom(cmd_mgmt_fd, ppkg->data, UDP_PKG_MAX_LEN, 0, (struct sockaddr*)&ppkg->cliaddr, &addrlen);
  85.         if(count <= 0)
  86.         {
  87.             printf("recvform error\n");
  88.             continue;
  89.         }

  90.         ppkg->srvfd = cmd_mgmt_fd;
  91.         job->data = ppkg;
  92.         job->jfunc = pkg_process;
  93.         THRDP_job_add(pool, job);
  94.     }
  95. }


  96. void start_client(int client_id)
  97. {
  98.     struct sockaddr_in srvaddr;
  99.     struct sockaddr_in peeraddr;
  100.     int len = sizeof(peeraddr);
  101.     int fd;
  102.     char ctnbuf[512] = {0};
  103.     char rcvbuf[512] = {0};
  104.     unsigned int i = 0;
  105.     fd = socket(AF_INET, SOCK_DGRAM, 0);
  106.     if(fd < 0)
  107.     {
  108.         printf("start_client socket fd failed.\n");
  109.         return;
  110.     }

  111.     memset(&srvaddr, 0, sizeof(struct sockaddr_in));
  112.     srvaddr.sin_family = AF_INET;
  113.     srvaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
  114.     srvaddr.sin_port = htons(SRV_PORT);


  115.     while(1)
  116.     {

  117.         memset(ctnbuf, 0, 512);
  118.         sprintf(ctnbuf, "%d say just test:%u", client_id, i++);

  119.         sendto(fd, ctnbuf, strlen(ctnbuf), 0, (struct sockaddr*)&srvaddr, sizeof(srvaddr));
  120.         memset(rcvbuf, 0, 512);
  121.         len = sizeof(peeraddr);
  122.         recvfrom(fd, rcvbuf, 512, 0, (struct sockaddr*)&peeraddr, &len);
  123.         printf("recv content:%s\n", rcvbuf);
  124.     }
  125. }

  126. int main(int argc, char** argv)
  127. {
  128.     if (argc < 2)
  129.     {
  130.         printf("usage : %s [-s/-c] [cid]", argv[0]);
  131.         return 0;
  132.     }
  133.     if (!strcmp(argv[1], "-s"))
  134.     {
  135.         start_server();
  136.     }
  137.     else
  138.     {
  139.         start_client(atoi(argv[2]));
  140.     }
  141.     return 0;

  142. }

测试是一个简单的udp回射服务器模型,用线程池的方式做服务端并发。

编译: gcc -g -o thrdp *.c

启动server端: ./thrdp -s

多终端启动client端: ./thrdp -c 100./thrdp -c 102 ...

运行截图如下
池化技术与线程池的简单实现-LMLPHP




10-27 15:58