线程池设计
TaskQueue
类
#pragma once
#include <queue>
#include <pthread.h>
using callback = void (*)(void *arg);
// 任务结构体
template <typename T>
struct Task
{
Task()
{
function = nullptr;
arg = nullptr;
}
Task(callback f, void *arg)
{
this->arg = static_cast<T *>(arg);
function = f;
}
callback function;
T *arg;
};
template <typename T>
class TaskQueue
{
public:
TaskQueue();
~TaskQueue();
// 添加任务
void addTask(Task<T> task);
void addTask(callback f, void *arg);
// 取出任务
Task<T> takeTask();
// 获取当前任务的个数
inline size_t TaskNumber()
{
return m_TaskQ.size();
}
private:
pthread_mutex_t m_mutex;
std::queue<Task<T>> m_TaskQ;
};
template <typename T>
TaskQueue<T>::TaskQueue()
{
pthread_mutex_init(&m_mutex, NULL);
}
template <typename T>
TaskQueue<T>::~TaskQueue()
{
pthread_mutex_destroy(&m_mutex);
}
template <typename T>
void TaskQueue<T>::addTask(Task<T> Task)
{
pthread_mutex_lock(&m_mutex);
m_TaskQ.push(Task);
pthread_mutex_unlock(&m_mutex);
}
template <typename T>
void TaskQueue<T>::addTask(callback f, void *arg)
{
pthread_mutex_lock(&m_mutex);
m_TaskQ.push(Task<T>(f, arg));
pthread_mutex_unlock(&m_mutex);
}
template <typename T>
Task<T> TaskQueue<T>::takeTask()
{
Task<T> tmp;
pthread_mutex_lock(&m_mutex);
if (!m_TaskQ.empty())
{
tmp = m_TaskQ.front();
m_TaskQ.pop();
}
pthread_mutex_unlock(&m_mutex);
return tmp;
}
这是一个C++模板类的实现,用于存储和管理任务队列。任务队列中的每个任务都是一个Task
结构体,包含一个回调函数和一个指向参数的指针。TaskQueue
类提供了添加任务、获取任务和获取任务数量等功能。
以下是TaskQueue
类的详细解释:
Task
结构体:包含一个回调函数function
和一个指向参数的指针arg
。回调函数的类型为void (*)(void *arg)
,即接受一个void *
类型的参数并返回void
的函数指针。TaskQueue
类:包含一个互斥锁m_mutex
和一个任务队列m_TaskQ
。互斥锁用于保护任务队列的访问,确保在多线程环境下的安全性。- 构造函数
TaskQueue()
:初始化互斥锁m_mutex
。 - 析构函数
~TaskQueue()
:销毁互斥锁m_mutex
。 addTask(Task task)
:添加一个任务到任务队列。首先锁定互斥锁,然后将任务添加到队列,最后解锁互斥锁。addTask(callback f, void *arg)
:创建一个新的Task
结构体,并将其添加到任务队列。首先锁定互斥锁,然后创建一个新的Task
结构体并将其添加到队列,最后解锁互斥锁。takeTask()
:从任务队列中取出一个任务。首先锁定互斥锁,然后检查队列是否为空,如果不为空,则取出队列中的第一个任务并将其从队列中移除,最后解锁互斥锁。返回取出的任务。TaskNumber()
:获取任务队列中的任务数量。直接返回队列的大小。
这个TaskQueue
类可以线程池的任务调度和管理。通过将任务添加到任务队列,线程可以从队列中获取任务并执行。
ThreadPool
类
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include "TaskQueue.hpp"
#include <iostream>
#include <string.h>
#include <unistd.h>
template <class T>
class ThreadPool
{
public:
// 创建线程池并进行初始化
ThreadPool(int min, int max);
// 销毁线程池
~ThreadPool();
// 给线程池添加任务
void addTask(Task<T> Task);
// 获取到线程池中工作线程的个数
int getBusyNum();
// 获取到线程池中活着的线程个数
int getLiveNum();
private:
// 工作的线程(消费者线程)任务函数
static void *worker(void *arg);
// 管理者线程任务函数
static void *manager(void *arg);
// 单个线程退出
void threadExit();
private:
// 任务队列
TaskQueue<T> *TaskQ;
pthread_t managerID; // 管理线程的ID
pthread_t *threadIDs; // 线程数组的ID
int minNum; // 最小线程的个数
int maxNum; // 最大线程的个数
int liveNum; // 存活的线程个数
int busyNum; // 忙碌的线程个数
int exitNum; // 销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个线程池
pthread_cond_t notEmpty; // 判断队列是否为空
bool shutdown; // 是否要销毁线程池,销毁为ture,不销毁为false
static const int NUMBER = 2; // 添加线程的个数
};
template <class T>
ThreadPool<T>::ThreadPool(int min, int max)
{
// 实例化任务队列
TaskQ = new TaskQueue<T>;
do
{
if (nullptr == TaskQ)
{
std::cout << "TaskQ malloc failed...\n";
break;
}
threadIDs = new pthread_t[max];
if (nullptr == threadIDs)
{
std::cout << "threadIDs malloc failed..." << '\n';
break;
}
memset(threadIDs, 0, sizeof(pthread_t) * max);
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min;
exitNum = 0;
if (pthread_mutex_init(&mutexPool, NULL) != 0 ||
pthread_cond_init(¬Empty, NULL) != 0)
{
std::cout << "mutex or condition init failed...\n";
break;
}
shutdown = false;
// 创建线程
pthread_create(&managerID, NULL, manager, this);
for (int i = 0; i < min; ++i)
{
pthread_create(&threadIDs[i], NULL, worker, this);
}
return;
} while (0);
// 申请失败释放资源
if (threadIDs)
delete[] threadIDs;
if (TaskQ)
delete TaskQ;
}
template <class T>
ThreadPool<T>::~ThreadPool()
{
// 关闭线程池
std::cout << __FILE__ << ":" << __func__ << ":" << __LINE__ << std::endl;
shutdown = true;
// 阻塞回收管理者线程
pthread_join(managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < liveNum; ++i)
{
pthread_cond_signal(¬Empty);
}
// 释放内存
if (TaskQ)
{
delete TaskQ;
}
if (threadIDs)
{
delete[] threadIDs;
}
pthread_mutex_destroy(&mutexPool);
pthread_cond_destroy(¬Empty);
}
template <class T>
void ThreadPool<T>::addTask(Task<T> Task)
{
if (shutdown)
return;
// 添加任务
TaskQ->addTask(Task);
pthread_cond_signal(¬Empty);
}
template <class T>
int ThreadPool<T>::getBusyNum()
{
pthread_mutex_lock(&mutexPool);
int busyNum = this->busyNum;
pthread_mutex_unlock(&mutexPool);
return busyNum;
}
template <class T>
int ThreadPool<T>::getLiveNum()
{
pthread_mutex_lock(&mutexPool);
int liveNum = this->liveNum;
pthread_mutex_unlock(&mutexPool);
return liveNum;
}
template <class T>
void *ThreadPool<T>::worker(void *arg)
{
ThreadPool *pool = static_cast<ThreadPool *>(arg);
while (true)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
if (0 == pool->TaskQ->TaskNumber() && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断是不是要销毁线程
if (0 < pool->exitNum)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
// 从任务队列中取出一个任务
Task<T> Task = pool->TaskQ->takeTask();
pool->busyNum++;
// 解锁
pthread_mutex_unlock(&pool->mutexPool);
std::cout << "thread" << std::to_string(pthread_self()) << "start working...\n";
Task.function(Task.arg);
delete Task.arg;
Task.arg = nullptr;
std::cout << "thread" << std::to_string(pthread_self()) << "end working...\n";
pthread_mutex_lock(&pool->mutexPool);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexPool);
}
return nullptr;
}
template <class T>
void *ThreadPool<T>::manager(void *arg)
{
ThreadPool *pool = static_cast<ThreadPool *>(arg);
while (!pool->shutdown)
{
// 每三秒检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量和工作线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->TaskQ->TaskNumber();
int liveNum = pool->liveNum;
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexPool);
// 添加线程
// 任务个数 > 存活的线程个数 && 存活的线程个数 < 最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < pool->NUMBER && pool->liveNum < pool->maxNum; ++i)
{
if (0 == pool->threadIDs[i])
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程数 > 最小的线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return nullptr;
}
template <class T>
void ThreadPool<T>::threadExit()
{
pthread_t tid = pthread_self();
pthread_mutex_lock(&mutexPool);
for (int i = 0; i < maxNum; ++i)
{
if (tid == threadIDs[i])
{
threadIDs[i] == 0;
std::cout << "threadExit() called , " << std::to_string(tid) << " exiting... \n";
break;
}
}
pthread_mutex_unlock(&mutexPool);
pthread_exit(NULL);
}
#endif
这是一个C++模板类的实现,用于创建和管理一个线程池。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池类似于一个任务处理的工厂,将任务添加到队列,然后创建线程来处理这些任务。线程池包含工作线程(worker threads)和一个管理线程(manager thread)。工作线程负责执行任务队列中的任务,管理线程负责根据任务队列的大小动态调整线程池中的线程数量,可以有效地控制线程的数量,避免大量线程之间的竞争导致系统性能下降。
以下是ThreadPool
类的详细解释:
- 构造函数
ThreadPool(int min, int max)
:创建一个线程池并进行初始化。参数min
和max
分别表示线程池中的最小线程数和最大线程数。构造函数中完成任务队列的实例化、线程ID数组的分配、互斥锁和条件变量的初始化、管理线程的创建以及最小数量的工作线程的创建。 - 析构函数
~ThreadPool()
:销毁线程池。设置线程池关闭标志,回收管理线程,唤醒阻塞的工作线程,释放任务队列和线程ID数组的内存,销毁互斥锁和条件变量。 addTask(Task Task)
:向线程池中添加任务。将任务添加到任务队列,并发出条件变量信号以唤醒阻塞的工作线程。getBusyNum()
:获取线程池中忙碌线程的数量。getLiveNum()
:获取线程池中存活线程的数量。worker(void *arg)
:工作线程的任务函数。循环执行以下操作:锁定线程池互斥锁,检查任务队列是否为空,如果为空且线程池未关闭,则阻塞等待任务到来;检查是否需要销毁线程,如果需要且当前存活线程数大于最小线程数,则销毁线程;从任务队列中取出任务并执行,执行完毕后更新忙碌线程数。manager(void *arg)
:管理线程的任务函数。循环执行以下操作:每隔3秒检查一次任务队列的大小、存活线程数和忙碌线程数;根据任务队列的大小和存活线程数判断是否需要添加线程,如果需要则创建新的工作线程;根据忙碌线程数和存活线程数判断是否需要销毁线程,如果需要则设置退出线程的数量,并发出条件变量信号以唤醒阻塞的工作线程。threadExit()
:单个线程退出函数。获取当前线程ID,遍历线程ID数组,找到对应的线程并将其设置为0,然后退出线程。
线程池类还包含以下私有成员变量:
TaskQueue
:任务队列。managerID
:管理线程的ID。threadIDs
:线程数组的ID。minNum
:最小线程的个数。maxNum
:最大线程的个数。liveNum
:存活的线程个数。busyNum
:忙碌的线程个数。exitNum
:销毁的线程个数。mutexPool
:锁整个线程池。notEmpty
:判断队列是否为空的条件变量。shutdown
:是否要销毁线程池,销毁为true,不销毁为false。NUMBER
:添加线程的个数,常量。
线程池的工作原理:
- 创建线程池时,会创建一个管理者线程和一定数量的工作线程。
- 管理者线程负责监控任务队列和工作线程的数量,根据任务数量动态地创建或销毁工作线程。
- 工作线程从任务队列中取出任务并执行,执行完毕后继续等待新任务。
- 当线程池被销毁时,管理者线程会通知所有工作线程退出,并等待它们退出后释放资源。
main.cpp
#include "ThreadPool.hpp"
void TaskFunc(void* arg)
{
int num = *(int*)arg;
std::cout << "thread" << std::to_string(pthread_self()) << "is working, number = "<< num << '\n';
sleep(1);
}
int main()
{
// 创建线程池
ThreadPool<int>* pool =new ThreadPool<int>(3, 5);
for (int i = 0; i < 20; ++i)
{
int* num = new int(i+100);
pool->addTask(Task<int>(TaskFunc, num));
}
sleep(10);
delete pool;
return 0;
}
解决线程处理函数传参问题
使用POSIX
线程库(pthread
)时,线程处理函数只能有一个参数:
#include <stdio.h>
#include <pthread.h>
typedef struct {
int a;
int b;
} thread_args;
void *thread_function(void *arg) {
thread_args *args = (thread_args *)arg;
printf("Thread function called with %d and %d\n", args->a, args->b);
return NULL;
}
int main() {
pthread_t t;
thread_args args = {42, 24};
pthread_create(&t, NULL, thread_function, (void *)&args);
pthread_join(t, NULL);
return 0;
}
在这个例子中,线程处理函数thread_function
只能有一个参数,因此需要将多个参数封装到一个结构体中,并将结构体指针传递给线程处理函数。
线程处理函数的参数数量取决于所使用的编程语言和线程库。在某些情况下,线程处理函数只能有一个参数,但在其他情况下,线程处理函数可以有多个参数。如果线程处理函数只能有一个参数,可以通过将多个参数封装到一个结构体或类中来实现传递多个参数。