目录
23.线程池
23.1 什么是线程池
线程池(Thread Pool)是一种管理和重用线程的机制,在多线程编程中,创建和销毁线程是一项开销较大的操作,因为涉及到操作系统的资源管理和线程上下文切换的成本。线程池通过预先创建一组线程并维护它们,可以避免频繁地创建和销毁线程,从而降低了系统的开销。
- 线程池中的多个已经创建好的线程负责从任务队列当中拿任务,并将拿到的任务进行处理。
- 线程池对外提供一个Push接口,用于让外部将任务Push到任务队列当中
23.2 为什么需要线程池
举个例子:
当需要处理大量的并发任务时,使用线程池能够提高系统的效率和性能 :
- 减少线程创建开销:线程池通过维护一定数量的预创建线程,避免了频繁创建和销毁线程所带来的系统资源消耗。这样一来,系统能够更有效地利用可用的处理器资源和内存,提高整体的资源利用率。
- 任务队列管理:线程池通常配备了任务队列,用于存储需要执行的任务。通过合理管理任务队列,可以实现任务的调度和分配,保证任务的有序执行,提高系统的整体性能。
- 提高代码可维护性:使用线程池能够将线程管理的逻辑与业务逻辑分离,使代码更加清晰简洁,降低了系统的复杂度,提高了代码的可维护性和可读性。
- 控制并发度:线程池允许开发者限制同时执行的线程数量,从而避免系统资源被过度占用,防止系统负载过重,提高系统的稳定性和可靠性。
23.3 线程池的应用场景
线程池适合应对那些需要异步执行、并发处理的任务。适合处理以下类型的任务:
这使得线程池在许多不同的应用场景中都能发挥重要作用。以下是一些常见的应用场景:
-
Web 开发:在 Web 开发中,常常需要处理大量的并发请求,如处理用户请求、数据库查询等。通过使用线程池,可以提高服务器的并发处理能力,减少请求的响应时间。
-
后台任务处理:在后台任务处理中,如批量数据处理、定时任务调度等,使用线程池可以更有效地管理和调度任务,提高系统的性能和效率。
-
图像处理和计算密集型任务:在图像处理、视频编解码、科学计算等领域,常常需要进行大量的计算密集型任务。使用线程池可以并行地处理这些任务,提高处理速度和效率。
-
数据库连接管理:在数据库访问中,通过使用线程池管理数据库连接,可以减少数据库连接的创建和销毁开销,提高数据库访问的性能和效率。
-
并行编程框架:在并行编程框架中,如 Java 的 Fork/Join 框架、Python 的 concurrent.futures 模块等,线程池常常被用来并行地执行任务,以提高程序的性能和扩展性。
23.4 实现一个简单的线程池
23.4.1 RAII风格信号锁
我们可以定义了一个 lockGuard
类,采用 RAII(资源获取即初始化)方式,对互斥锁进行加锁和解锁,确保在作用域结束时自动释放锁。
这里我们创建一个名为lock.hpp:的文件来定义lockGuard类
#pragma once
#include <iostream>
#include <pthread.h>
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx) : mtx_(mtx)
{
pthread_mutex_lock(mtx_);
}
~lockGuard()
{
pthread_mutex_unlock(mtx_);
}
private:
pthread_mutex_t *mtx_; // 指向要管理的互斥锁的指针
};
在lockGuard
类的构造函数中,首先通过传入的pthread_mutex_t
类型的指针初始化mtx_
成员变量,即指向要管理的互斥锁。然后调用pthread_mutex_lock
函数对该互斥锁进行加锁操作。
在lockGuard
类的析构函数中,调用pthread_mutex_unlock
函数对互斥锁进行解锁操作。由于该析构函数在对象生命周期结束时自动调用,因此实现了互斥锁的自动释放。这样,在使用lockGuard
对象时,只需要在作用域中创建该对象,当对象离开作用域时,析构函数会自动调用,从而释放互斥锁,确保了互斥锁的安全管理。
23.4.2 线程的封装
这里我们创建一个名为Thread.hpp的文件来定义Thread类,实现对其的封装.
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
#include <pthread.h> // 使用 pthread 相关函数,需要包含 pthread.h 头文件
// 定义一个函数指针类型,用于作为线程的回调函数
typedef void *(*fun_t)(void *);
// 线程的数据结构,用于传递给线程执行函数
class ThreadData
{
public:
void *args_; // 线程执行函数的参数
std::string name_; // 线程的名称
};
// 线程类,用于创建和管理线程
class Thread
{
public:
// 构造函数,初始化线程对象
Thread(int num, fun_t callback, void *args) : func_(callback)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
name_ = nameBuffer;
tdata_.args_ = args;
tdata_.name_ = name_;
}
// 启动线程
void start()
{
pthread_create(&tid_, nullptr, func_, (void *)&tdata_);
}
// 等待线程结束
void join()
{
pthread_join(tid_, nullptr);
}
// 返回线程的名称
std::string name()
{
return name_;
}
// 析构函数
~Thread()
{
}
private:
std::string name_; // 线程名称
fun_t func_; // 线程执行的回调函数
ThreadData tdata_; // 线程的数据
pthread_t tid_; // 线程ID
};
23.4.3 日志打印
这里我们创建一个名为Log.hpp的文件来实现日志打印保存
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./threadpool.log"
void logMessage(int level, const char *format, ...)
{
#ifndef DEBUG_SHOW
if(level== DEBUG) return;
#endif
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
FILE *fp = fopen(LOGFILE, "a");
fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
fclose(fp);
}
- 这段代码定义了一个函数 logMessage,用于记录日志到一个指定的文件中。
- 日志级别被定义为 DEBUG、NORMAL、WARNING、ERROR 和 FATAL,不同级别的日志会按照不同的格式输出到日志文件中。
- 日志文件名被定义为 ./threadpool.log 。
- 编译时如果未显示定义 -DEBUG_SHOW ,则不会将DEBUG 级别的日志打印进文件
va_list args;
va_start(args, format);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
这段代码片段使用了 C 语言的可变参数函数和相关的宏来格式化日志消息。
22.4.4 定义队列中存放Task类任务
这里我们创建一个名为Task.hpp的文件来定义Task类
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "Log.hpp"
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
{}
void operator ()(const std::string &name)
{
logMessage(WARNING, "%s处理完成: %d+%d=%d | %s | %d",
name.c_str(), x_, y_, func_(x_, y_), __FILE__, __LINE__);
}
public:
int x_;
int y_;
func_t func_;
};
23.4.5 线程池的实现(懒汉模式)
我们需要实现Push接口向任务队列里面放任务,又要Pop任务给已经创建好的线程,其本质就是一个生产者-消费者模型
这里我们创建一个名为ThreadPool.hpp的文件来定义ThreadPool类
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "Thread.hpp" // 导入线程类头文件
#include "Lock.hpp" // 导入锁相关的头文件
#include "Log.hpp" // 导入日志相关的头文件
const int g_thread_num = 3; // 默认线程池中线程数量
// 线程池类模板
template <class T>
class ThreadPool
{
public:
pthread_mutex_t *getMutex() // 获取互斥锁指针
{
return &lock;
}
bool isEmpty() // 判断任务队列是否为空
{
return task_queue_.empty();
}
void waitCond() // 等待条件变量
{
pthread_cond_wait(&cond, &lock);
}
T getTask() // 获取任务
{
T t = task_queue_.front(); // 获取队首任务
task_queue_.pop(); // 移除队首任务
return t;
}
private:
ThreadPool(int thread_num = g_thread_num) : num_(thread_num) // 构造函数
{
pthread_mutex_init(&lock, nullptr); // 初始化互斥锁
pthread_cond_init(&cond, nullptr); // 初始化条件变量
for (int i = 1; i <= num_; i++)
{
threads_.push_back(new Thread(i, routine, this)); // 创建线程对象并加入线程池
}
}
ThreadPool(const ThreadPool<T> &other) = delete; // 禁止拷贝构造函数
const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete; // 禁止赋值运算符
public:
static ThreadPool<T> *getThreadPool(int num = g_thread_num) // 获取线程池单例
{
if (nullptr == thread_ptr) // 如果线程池指针为空
{
lockGuard lockguard(&mutex); // 创建锁保护
if (nullptr == thread_ptr)
{
thread_ptr = new ThreadPool<T>(num); // 创建线程池对象
}
}
return thread_ptr; // 返回线程池指针
}
void Threadinit() // 初始化线程池中的线程
{
for (auto &iter : threads_) // 遍历线程池中的线程对象
{
iter->start(); // 启动线程
logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功"); // 记录日志
}
}
static void *routine(void *args) // 线程执行函数
{
ThreadData *td = (ThreadData *)args; // 获取线程数据
ThreadPool<T> *tp = (ThreadPool<T> *)td->args_; // 获取线程池指针
while (true)
{
T task;
{
lockGuard lockguard(tp->getMutex()); // 加锁
while (tp->isEmpty()) // 如果任务队列为空
tp->waitCond(); // 等待条件变量
task = tp->getTask(); // 获取任务
}
task(td->name_); // 执行任务
}
}
void pushTask(const T &task) // 添加任务到任务队列
{
lockGuard lockguard(&lock); // 加锁
task_queue_.push(task); // 添加任务到队列
pthread_cond_signal(&cond); // 发送信号唤醒线程
}
~ThreadPool() // 析构函数
{
for (auto &iter : threads_) // 遍历线程池中的线程对象
{
iter->join(); // 等待线程结束
delete iter; // 删除线程对象
}
pthread_mutex_destroy(&lock); // 销毁互斥锁
pthread_cond_destroy(&cond); // 销毁条件变量
}
private:
std::vector<Thread *> threads_; // 线程对象的容器
int num_; // 线程数量
std::queue<T> task_queue_; // 任务队列
static ThreadPool<T> *thread_ptr; // 线程池指针
static pthread_mutex_t mutex; // 互斥锁
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件变量
};
// 初始化线程池指针为nullptr
template <typename T>
ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr;
// 初始化互斥锁
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
- 线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。
- 线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要先判断任务队列当中是否有任务,若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。
- 当外部线程向任务队列中Push一个任务后,此时可能有线程正处于等待状态,因此在新增任务后需要唤醒在条件变量下等待的线程。
注意:
-
使用
while
进行条件判断:在使用条件变量等待线程唤醒时,应该使用while
循环而不是if
语句进行条件判断。这是因为唤醒线程可能会出现伪唤醒(spurious wakeup)的情况,或者是其他导致唤醒的条件。使用while
循环可以确保线程在被唤醒后重新检查条件,以防止出现虚假唤醒的情况。 -
避免惊群效应:确实,使用
pthread_cond_broadcast
可能会导致惊群效应,即一次性唤醒大量线程,而实际上只有一个线程能够获取到任务。为了避免这种情况,最好使用pthread_cond_signal
函数唤醒一个等待的线程即可,这样可以减少不必要的竞争和资源浪费。 -
在解锁之后处理任务:正确的做法是在拿到任务后先解锁,然后再处理任务。这样可以确保其他线程能够及时获取到任务并开始处理,而不会因为一个线程长时间持有锁而导致其他线程无法执行的情况。将任务处理过程放到解锁之前可能会导致资源的浪费和效率降低。
-
避免串行执行:如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。
当使用pthread_create
函数创建线程时,需要提供一个C风格的函数指针作为线程的执行例程。然而,在C++中,类的成员函数会隐式地接收一个指向该类对象的指针作为第一个参数(即this
指针)。因此,如果直接将类的成员函数作为线程的执行例程,会导致函数签名不匹配,无法通过编译。
为了解决这个问题,一种常见的方法是将线程执行例程设置为静态方法。静态方法不会隐式接收this
指针作为参数,而是直接通过类名调用。这样,就可以在pthread_create
函数中传递一个静态成员函数的指针作为线程的执行例程。同时,静态方法也具有与类对象实例无关的特性,因此可以被多个线程共享。
然而,静态方法无法直接访问非静态成员函数和变量。如果在执行例程中需要访问类的非静态成员函数或变量(比如Pop),可以通过在创建线程时向执行例程传递类对象的指针来实现。这样,在执行例程中就可以使用该指针来调用类的非静态成员函数或访问非静态成员变量。
私有化构造函数的主要原因是确保线程池类 ThreadPool<T>
的单例模式。通过私有化构造函数,外部无法直接创建新的线程池实例,而是必须通过静态方法 getThreadPool()
来获取线程池的唯一实例。
23.4.6 单例模式
单例(Singleton)模式,是一种常用的软件设计模式。在它的核心结构中只包含一个被称为单例的特殊类。通过单例模式可以保证系统中,应用该模式的类一个类只有一个实例。即一个类只有一个对象实例 ;
饿汉方式实现单例模式
template <typename T>
class Singleton
{
private:
static Singleton<T> data;//饿汉模式,在加载的时候对象就已经存在了
public:
static Singleton<T>* GetInstance()
{
return &data;
}
};
该模式能简单快速的创建一个单例对象,而且是线程安全的(只在类加载时才会初始化,以后都不会)。但它有一个缺点,就是不管你要不要都会直接创建一个对象,会消耗一定的性能(当然很小很小,几乎可以忽略不计,所以这种模式在很多场合十分常用而且十分简单)
懒汉方式实现单例模式
该模式只在你需要对象时才会生成单例对象(比如调用GetInstance方法)
template <typename T>
class Singleton
{
private:
static Singleton<T>* inst; //懒汉式单例,只有在调用GetInstance时才会实例化一个单例对象
public:
static Singleton<T>* GetInstance()
{
if (inst == NULL)
{
inst = new Singleton<T>();
}
return inst;
}
};
看上去,这段代码没什么明显问题,但它不是线程安全的。假设当前有多个线程同时调用GetInstance()方法,由于当前还没有对象生成,那么就会由多个线程创建多个对象。
// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
private:
static Singleton<T>* inst;
static std::mutex lock;
public:
static T* GetInstance()
{
if (inst == NULL) // 双重判定空指针, 降低锁冲突的概率, 提高性能
{
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new
if (inst == NULL)
{
inst = new T();
}
lock.unlock();
}
return inst;
}
};
这种形式是在懒汉方式的基础上增加的,当多个线程调用GetInstance方法时,此时类中没有对象,那么多个线程就会来到锁的位置,竞争锁。必然只能有一个线程竞争锁成功,此时再次判断有没有对象被创建(就是inst指针),如果没有就会new一个对象,如果有就会解锁,并返回已有的对象;总的来说,这样的形式使得多个线程调用GetInstance方法时,无论成功与否,都会有返回值
23.4.7 主程序实现
#include"Log.hpp"
#include"ThreadPool.hpp"
#include"Task.hpp"
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
ThreadPool<Task>::getThreadPool()->Threadinit();
while(true)
{
int x = rand()%100 + 1;
usleep(7721);
int y = rand()%30 + 1;
Task t(x, y, [](int x, int y)->int{
return x + y;
});
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
logMessage(DEBUG, "制作任务完成: %d+%d=?", x, y);
ThreadPool<Task>::getThreadPool()->pushTask(t);
sleep(1);
}
return 0;
}
22.4.8 程序运行效果
程序运行后,我们打开路径下自动新建得threadpool.log文件,可以看到程序自动打印的日志