定义
线程池属于生产消费模型,管理维持固定数量的池式结构,避免线程频繁的创建和销毁
应用场景
当一类任务耗时,严重影响当前线程处理其他任务,异步执行
任务类型
耗时任务:
- CPU密集型
- IO密集型 ( 网络IO 磁盘IO)
线程数量
n * proc
数据结构设计:
任务设计:
typedef struct task_s {
void * next;
handler_pt func;
void * arg;
} task_t;
生产者线程: 发布任务
消费者线程: 取出任务,执行任务
数据结构为链表
队列设计:
typedef struct task_queue_s {
task_t *queue;
void * head;
void **tail;
int block; // 是否阻塞
spinlock_t lock; // 自选锁
pthread_muxtex_t mutex;
pthread_cond_t cond;
}task_queue_t;
队列: 存储任务,调度线程池 ,双端开口,先进先出,在多线程中执行,需要加锁
功能: 调取线程池中的消费者线程, 如果此时队列为空,谁(x线程)来取任务,谁阻塞休眠
当允许一个进程进入临界资源(互斥状态)。
自旋锁: 其他线程空转cpu,一直等待进入临界资源
互斥锁:切换cpu, 让出执行权, 线程阻塞住,操作系统调用其他的线程
某个线程持有临界资源的时间 < 线程切换的时间 , 自旋锁 ,时间复杂度为0(1)
生产者新增任务,消费者取出任务 ,0(1),均为移动指针完成(尾插法,头插法)
故使用自旋锁 spinlock_t lock
线程池设计
struct thredpool_s {
atomic_int quit; // 原子变量
int thrd_count;
pthread_t * threads;
task_queue_t task_queue;
};
原子操作:一个线程在执行过程中,其他线程不能执行这个线程的内部操作,只能看到线程执行前或者执行后
应用场景: 某一个基础类型给的变量
接口设计
static task_queue_t * __taksqueue_create() {
task_queue_t * queue = malloc(sizeof(*queue));
int ret;
ret = pthrad_mutex_init(mutex);
if(ret == 0) {
ret = pthread_cond_init();
if(ret == 0) {
queue->head = NULL;
queue->tail = &queue->head;
queue->block = 1;
return queue;
}
pthread_cond_destory(queue);
}
pthread_muext_destory(&queue->mutex);
return NULL;
}
__add_task(task_queue_t * queue, void * task) {
void **link = (void **)task; // malloc
*link = NULL; // task->next = NULL;
spinlock_lock(&queue->lock);
*queue->tail = link; // *queue 相当于 task_t * queue, 同时地址也是queue->next;末尾添加新的节点
queue->tail = link // tail 指向新的尾节点
spinlock_unlock(&qeuue->lock);
pthread_cond_signal(cond:&queue->cond); // 有任务,唤醒休眠的线程
}
__pop__task(task_queue_t * queue) {
spinlock_lock(&queue->lock);
if(queue->head) {
spinlock_unclock(&queue->lock);
return NULL;
}
task_t *task;
task = queue->head;
queue->head = task->next;
if(queue-head == NULL) {
queue->tail = &queue->head; // &NULL
}
spinlock_unlock(&queue->lock);
return task;
}
void * __get_task (task_queue_t *queue) {
task_t *task;
while((task = __pop_task(queue))== NULL) {
pthread_mutex_lock(&queue->lock);
if(queue->block == 0) {
rerurn NULL;
}
// pthread_cond_wait 执行过程:
// 1. 先unlock(&mutex)
// 2. cond 休眠
// 3, 生产者 发送signal
// 4. cond 唤醒
// 5. 既然上clock(&mutex)
pthead_cond_wait(cond:&queue->cond,mutex:&queue->mutex); //休眠
pthread_mutex_unlock(mutex:&queue->mutex);
}
return task;
}
void __taskqueue_destroy(task_queue_t * queue) {
task_t *task;
while((task) = _pop_task(queue)) {
free(ptr:task);
}
spinlock_destroy(lock:&queue->lock);
pthread_cond_destory(cond:&queue->cond);
pthread_mutex_destory(mutex:&queue->mutex);
free(ptr:queue);
}