文章目录
一、线程池
// ThreadPool.hpp
#pragma once
#include <pthread.h>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include <unordered_map>
struct ThreadInfo
{
pthread_t tid_; // 线程 ID
std::string name_; // 线程的名字
};
template <class T>
class ThreadPool
{
static const int defaultnum = 5;
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Weakup()
{
pthread_cond_signal(&cond_);
}
void Sleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsTaskQueueEmpty()
{
return tasks_.empty();
}
T PopTasks()
{
T task = tasks_.front();
tasks_.pop();
return task;
}
const std::string &GetThreadName(pthread_t tid)
{
return um_[tid];
}
public:
ThreadPool(int thread_num = defaultnum)
:threads_(thread_num), thread_num_(thread_num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
static void *Routine(void *args)
{
ThreadPool *tp = static_cast<ThreadPool*>(args);
std::string name = tp->GetThreadName(pthread_self());
while(true)
{
tp->Lock();
while(tp->IsTaskQueueEmpty())
{
tp->Sleep();
}
T task = tp->PopTasks();
tp->Unlock();
task.run(); // 此时这个任务已经属于该线程私有的了,所以对任务的处理工作可以在解锁之后进行
printf("%s is running----%s\n", name.c_str(), task.result_to_string().c_str());
}
}
void start()
{
for(int i = 0; i < thread_num_; i++)
{
threads_[i].name_ = "Thread-" + std::to_string(i);
pthread_create(&(threads_[i].tid_), nullptr, Routine, this);
um_[threads_[i].tid_] = threads_[i].name_;
}
}
void push(const T& task)
{
Lock();
tasks_.push(task);
Weakup();
Unlock();
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
std::vector<ThreadInfo> threads_; // 一批线程
int thread_num_; // 线程池中的线程的数量
std::queue<T> tasks_; // 任务
pthread_mutex_t mutex_; // 定义一把锁,让所有线程保持互斥
pthread_cond_t cond_; // 定义一个条件变量,让线程同步
std::unordered_map<pthread_t, std::string> um_; // 一个 map 用来快速查找一个线程的name
};
注意:Routine
方法如果要写在函数内部,那么一定得是 static
的。因为 pthread_creat
函数的参数要求 Routine
必须是 void *(*)(void *)
类型。而非 static
的成员函数其第一个参数是隐藏的 this
指针,不符合要求。将 Routine
设置为 static
,此时又会有新的问题产生,就是,在静态成员函数中无法访问到非静态成员,为了解决这个问题,我们将 this
指针做为线程函数的参数传递过去,此时就可以在 Routine
函数中去调用非静态的成员。
// main.cpp
#include "ThreadPool.hpp"
#include "Task.h"
#include <unistd.h>
#include <iostream>
using namespace std;
const std::string opers = "+-*/%";
int main()
{
srand((unsigned int)time(nullptr));
ThreadPool<Task> *tp = new ThreadPool<Task>(5);
tp->start();
int len = opers.size();
while(true)
{
//1. 构建任务
// 模拟获取数据
int data1 = rand() % 10 + 1; // [1, 10]
usleep(10);
int data2 = rand() % 13; // [0, 13]
usleep(10);
char op = opers[rand() % len];
Task task(data1, data2, op);
//2. 交给线程池处理
printf("main thread push a task: %s\n", task.get_task().c_str());
tp->push(task);
usleep(100000);
}
return 0;
}
二、封装线程
// Thread.hpp
#pragma once
#include <pthread.h>
#include <string>
typedef void (*callback_t)();
class Thread
{
public:
static void *Routine(void *args)
{
Thread *th = static_cast<Thread*>(args);
th->Entery();
}
Thread(callback_t func)
:tid_(0), name_(""), start_time_(0), isrunning_(false), func_(func)
{}
void Run()
{
name_ = "Thread-"+std::to_string(num);
start_time_ = time(nullptr);
isrunning_ = true;
pthread_create(&tid_, nullptr, Routine, this);
}
void Join()
{
pthread_join(tid_, nullptr);
isrunning_ = false;
}
pthread_t get_tid()
{
return tid_;
}
std::string get_name()
{
return name_;
}
uint64_t get_start_time()
{
return start_time_;
}
bool is_running()
{
return isrunning_;
}
void Entery()
{
func_();
}
~Thread()
{}
private:
pthread_t tid_;
std::string name_;
uint64_t start_time_; // 线程启动时间
bool isrunning_;
callback_t func_; // 线程将来要执行的函数, 回调函数
static int num; // 线程编号
};
int Thread::num = 1;
线程封装的好处在于,可以像使用对象一样去使用线程,如下:
// main.cc
#include "Thread.hpp"
#include <cstdio>
#include <unistd.h>
#include <iostream>
#include <cstdio>
#include <vector>
using namespace std;
void Print()
{
while(true)
{
printf("Hello Linux\n");
sleep(1);
}
}
int main()
{
vector<Thread> threads;
for(int i = 0; i < 5; i++)
{
threads.push_back(Thread(Print));
}
for(auto &thread : threads)
{
thread.Run();
}
for(auto &thread : threads)
{
thread.Join();
}
return 0;
}
三、STL,智能指针和线程安全
3.1 STL 中的容器是否是线程安全的?
不是,因为 STL 设计的初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。而且对于不同的容器,加锁方式不同,性能可能也不同(例如 hash
表的锁表和锁桶)。因此 STL 默认不是线程安全的。如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。
3.2 智能指针是否是线程安全的?
对于 unique_ptr
,由于只是在当前代码范围内生效,因此不涉及线程安全问题。对于 shared_ptr
,多个对象需要共用一个引用计数变量,所以会存在线程安全问题。但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS
)的方式保证 shared_ptr
能够高效,原子的操作引用计数。
四、线程安全的单例模式
4.1 什么是单例模式?
单例模式是一种“经典的,常用的“设计模式。所谓设计模式就是,对一些经典的常见场景,给定了一些应对的解决方案,这个就是设计模式**。某些类,只应该具有一个对象(实例),就称之为单例**,在很多服务器开发场景中,经常需要让服务器加载很多数据(上百G)到内存中,此时往往要用一个单例的类来管理这些数据。
4.2 饿汉和懒汉
4.2.1 饿汉方式实现单例
饿汉方式:就是吃完饭,立刻洗完,因为下一顿吃的时候可以立刻拿着碗就能吃饭。
template <typename T>
class Singleton
{
static T data;
public:
static T* GetInstance()
{
return &data;
}
};
小Tips:全局变量和静态变量,在程序加载的时候就已经被创建好了,局部对象实在程序运行过程中进行加载的。
饿汉模式,在 Singleton
类中定义了一个 T
类型的成员静态成员变量,因为是静态的,无论创建多少个 Singleton
对象,最终都会只有一个 T
类型的对象,并且因为是静态的,在程序被加载的时候就已经被创建出来了,后面在需要的时候直接使用即可。
4.2.2 懒汉方式实现单例
懒汉方式:就是吃完饭,先把碗放下,然后下一顿饭用到这个碗了再洗碗,这就是懒汉方式。懒汉方式最核心的思想是 “延时加载”,从能够优化服务器的启动速度。其实从整个过程来看,懒汉模式并不会提高太多效率,因为最终该加载的东西还是需要花时间去进行加载,饿汉模式采用立即加载的方式,前期可能会慢一点,而懒汉模式前期只是去申请资源,并没有实际的去加载资源,而是等待需要的时候再进行加载,所以说,懒汉方式可以优化启动速度。
template <typename T>
class Singleton
{
static T* inst;
public:
static T* GetInstance()
{
if (inst == NULL)
{
inst = new T();
}
return inst;
}
};
懒汉模式中,Singeton
类中是一个静态的指针,因为是指针,所以在程序加载的时候并不会立即就创建出一个 T
类型的对象,而是在需要的时候,调用 GetInstance
创建对象。但是这样做存在线程安全问题,第一次调用 Getlnstance
的时候,如果两个线程同时调用,可能会创建出两份 T
对象的实例,但是后续再次调用,就没有问题了。
4.3 基于懒汉方式实现的单例线程池
#pragma once
#include <pthread.h>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include <unordered_map>
struct ThreadInfo
{
pthread_t tid_; // 线程 ID
std::string name_; // 线程的名字
};
template <class T>
class ThreadPool
{
static const int defaultnum = 5;
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Weakup()
{
pthread_cond_signal(&cond_);
}
void Sleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsTaskQueueEmpty()
{
return tasks_.empty();
}
T PopTasks()
{
T task = tasks_.front();
tasks_.pop();
return task;
}
const std::string &GetThreadName(pthread_t tid)
{
return um_[tid];
}
public:
static void *Routine(void *args)
{
ThreadPool *tp = static_cast<ThreadPool *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsTaskQueueEmpty())
{
tp->Sleep();
}
T task = tp->PopTasks();
tp->Unlock();
task.run(); // 此时这个任务已经属于该线程私有的了,所以对任务的处理工作可以在解锁之后进行
printf("%s is running----%s\n", name.c_str(), task.result_to_string().c_str());
}
}
void start()
{
for (int i = 0; i < thread_num_; i++)
{
threads_[i].name_ = "Thread-" + std::to_string(i);
pthread_create(&(threads_[i].tid_), nullptr, Routine, this);
um_[threads_[i].tid_] = threads_[i].name_;
}
}
void push(const T &task)
{
Lock();
tasks_.push(task);
Weakup();
Unlock();
}
static ThreadPool<T> *GetInstance() // 指正通过该接口获取一个单例对象
{
if (ptp_ == nullptr)
{
pthread_mutex_lock(&smutex_);
if (ptp_ == nullptr)
{
printf("log: singleton creat done first!\n");
ptp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&smutex_);
}
return ptp_;
}
private:
ThreadPool(int thread_num = defaultnum)
: threads_(thread_num), thread_num_(thread_num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(const ThreadPool<T> &tp) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;
private:
std::vector<ThreadInfo> threads_; // 一批线程
int thread_num_; // 线程池中的线程的数量
std::queue<T> tasks_; // 任务
pthread_mutex_t mutex_; // 定义一把锁,让所有线程保持互斥
pthread_cond_t cond_; // 定义一个条件变量,让线程同步
std::unordered_map<pthread_t, std::string> um_; // 一个 map 用来快速查找一个线程的name
static ThreadPool<T> *ptp_; // 单例
static pthread_mutex_t smutex_; // 定义一把静态的锁
};
template <class T>
ThreadPool<T> *ThreadPool<T>::ptp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::smutex_ = PTHREAD_MUTEX_INITIALIZER;
相较于上面第九节中的普通版线程池,这一版基于懒汉方式实现的单例线程池,主要改动的地方在:增加一个 static static ThreadPool<T> *
类型的指针,指向一个线程池;将所有可能创建对象的成员函数私有,这样就无法在类外直接创建线程池对象,将拷贝构造和运算符重载直接禁掉;提供一个静态的成员方法,供外部来获取一个单例的进程池对象。
在最开始这个单例对象还没被创建的时候,可能存在多个线程同时去获取,此时可能有多个线程判断 ptp_ == nullptr
都成立,所以可能会创建出多份线程池对象,所以在判断前需要先加锁,因为需要再静态成员函数里面使用锁,所以我们需要定义一个静态的锁。但是,仔细想想,只有在对象没有被创建出来的时候,才可能存在多个线程同时去获取,可能会创建多个对象,只要该对象被创建出来了,之后无论同时再来多少线程都不会去创建对象,但是因为加锁了,所以后面来的多个线程都只能去串行的申请线程池,此时再看加锁操作,仿佛就显得有点多此一举,因此我们在加锁前可以再进行以判断,看 ptp_
是否等于空,只有当其等于空的时候,再进行加锁,这样做级既可以保证不会创建出多个线程池对象,也不会因为加锁导致效率下降。
线程池使用:
#include "ThreadPool.hpp"
#include "Task.h"
#include <unistd.h>
#include <iostream>
using namespace std;
const std::string opers = "+-*/%";
int main()
{
printf("main thread is start!...\n");
sleep(3);
srand((unsigned int)time(nullptr));
// ThreadPool<Task> *tp = ThreadPool<Task>::GetInstance(); // 获取单例对象
ThreadPool<Task>::GetInstance()->start(); // 获取一个单例对象
int len = opers.size();
while(true)
{
//1. 构建任务
// 模拟获取数据
int data1 = rand() % 10 + 1; // [1, 10]
usleep(10);
int data2 = rand() % 13; // [0, 13]
usleep(10);
char op = opers[rand() % len];
Task task(data1, data2, op);
//2. 交给线程池处理
printf("main thread push a task: %s\n", task.get_task().c_str());
ThreadPool<Task>::GetInstance()->push(task);
usleep(1000000);
}
return 0;
}
五、其他常见的各种锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁、写锁、行锁等),当其它线程想要访问数据时,被阻塞挂起。
- 乐观锁: 每次取数据的时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他线程在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和
CAS
操作。 CAS
操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。- 自旋锁:当一个进程申请锁失败时,不是将自己挂起,而是继续去申请锁。使用这种锁的前提是,线程在临界区中执行的时间要足够的短。可以通过
pthread_mutex_trylock
配合while
循环实现一个自旋锁。pthread
库也为我们提供了自旋锁pthread_spinlock_t
六、读者写者问题
6.1 读写锁
在多线程场景下,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的过程,中间耗时很长,给这种代码段加锁,会极大的降低程序效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢?答案是有,采用读写锁。
pthread
库也为我们准备了读写锁 pthread_rwlock_t
,其接口和互斥量的一样,只不过加锁的接口有所不同,对读者来说,加锁使用 pthread_rwlock_rdlock
接口,对写者来说,使用 pthread_rwlock_wrlock
进行加锁,读写者释放锁都采用同一个接口 pthread_rwlock_unlock
进行释放。
读写者模型的同步策略:采用读写锁的前提是,存在非常多的读者,而只有少量的写者,这就必定会导致大部分时间都是读者在执行相应的代码,写者处于一种饥饿状态,这种情况也叫做读者优先。我们可以通过设置写者优先同步策略来解决这种问题,当读者和写者都在等待锁的时候,当锁被释放的时候,让写者拿到锁。
七、结语
今天的分享到这里就结束啦!如果觉得文章还不错的话,可以三连支持一下,春人的主页还有很多有趣的文章,欢迎小伙伴们前去点评,您的支持就是春人前进的动力!