目录
1. 线程池
模块一:线程的封装
#ifndef __THREAD_HPP_
#define __THREAD_HPP_
#include <iostream>
#include <pthread.h>
#include <string>
#include "Log.hpp"
const int BUFFER_SIZE = 64;
typedef void*(*Tfunc_t)(void*);
namespace Xq
{
class thread_info
{
public:
thread_info(const std::string& name = std::string (), void* arg = nullptr)
:_name(name)
,_arg(arg)
{}
void set_info(const std::string& name, void* arg)
{
_name = name;
_arg = arg;
}
std::string& get_name()
{
return _name;
}
void*& get_arg()
{
return _arg;
}
private:
std::string _name;
void* _arg;
};
class thread
{
public:
thread(size_t num, Tfunc_t func, void* arg)
:_func(func)
,_arg(arg)
{
// 构造线程名
char buffer[BUFFER_SIZE] = {0};
snprintf(buffer, BUFFER_SIZE, "%s %ld", "thread", num);
_name = buffer;
// 设置线程所需要的信息, 线程名 + _arg
_all_info.set_info(_name, _arg);
}
// 创建线程
void create(void)
{
pthread_create(&_tid, nullptr, _func, static_cast<void*>(&_all_info));
//std::cout << "创建线程: " << _name << " success" << std::endl;
LogMessage(NORMAL, "%s: %s %s", "创建线程", _name.c_str(), "success");
}
pthread_t get_tid()
{
return _tid;
}
private:
std::string _name; // 线程名
Tfunc_t _func; // 线程的回调
pthread_t _tid; //线程ID
thread_info _all_info; // 装载的是 线程名 + _arg;
// 线程参数, 未来我们会将其和线程名封装到一起(thread_info),整体传递给线程
void* _arg;
};
}
#endif
模块二:线程池的封装
#ifndef __THREADPOOL_HPP_
#define __THREADPOOL_HPP_
#include <iostream>
#include <vector>
#include <queue>
#include <ctime>
#include "Thread.hpp"
#include "LockGuard.hpp"
#include "Task.hpp"
#include "Log.hpp"
#include "Date.hpp"
#include <unistd.h>
#define TNUM 3
namespace Xq
{
template<class T>
class thread_pool
{
public:
pthread_mutex_t* get_lock()
{
return &_lock;
}
bool is_que_empty()
{
return _task_que.empty();
}
void wait_cond(void)
{
pthread_cond_wait(&_cond, &_lock);
}
T get_task(void)
{
T task = _task_que.front();
_task_que.pop();
return task;
}
public:
// 线程池的构造函数负责实例化线程对象
thread_pool(size_t tnum = TNUM)
:_tnum(tnum)
{
for(size_t i = 0; i < _tnum; ++i)
{
// 这里只是创建了我们封装的线程对象
// 事实上, 在这里,线程并没有真正被创建
//_VPthread.push_back(new thread(i, routine, nullptr));
// 为了解决静态成员函数无法访问非静态的成员方法和成员属性
// 因此我们在这里可以传递线程池对象的地址,即this指针
// 我们在这里是可以传递this指针的
// 因为走到这里,此时的线程池对象已经存在 (空间已经开辟好了)
_VPthread.push_back(new thread(i, routine, this));
}
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
// 启动所有线程池中的线程
void run_all_thread(void)
{
for(const auto& vit : _VPthread)
{
vit->create();
}
}
// 线程的回调
/*
* 首先 routine函数是线程池中的线程的回调函数
* 任务是获取任务表的任务,处理任务(消费过程,消费任务)
* 如果routine设计成了类的非静态成员函数,那么其第一个
* 参数就是this指针, 与线程的回调函数类型不匹配。
* 因此我们在这里可以将其设置为静态函数
*/
/*
* 当我们用static可以解决了上面的问题时(非静态成员函数的第一个参数是this指针)
* 新问题又产生了,由于静态成员函数没有隐藏的this指针
* 故静态成员方法无法访问成员属性和成员方法
* 而我们的routine是用来消费任务表中的任务的
* 换言之,它需要访问线程池中的成员属性(任务表)
* 可是,此时的routine没有能力访问,如何解决这个问题?
* 我们可以将线程池对象的this指针作为参数传递给我们封装的线程对象
* 让特定线程通过this指针访问任务表
*/
static void* routine(void* arg)
{
thread_info* info = static_cast<thread_info*>(arg);
// 获得this指针
thread_pool<T>* TP = static_cast<thread_pool<T>*>(info->get_arg());
// 通过this指针访问任务表
// 之前说了, 此时的任务表是一个临界资源
// 因此,我们需要保证它的安全性以及访问合理性问题
while(true)
{
T task;
{
lock_guard lg(TP->get_lock());
while(TP->is_que_empty())
TP->wait_cond();
// 走到这里, 临界资源一定是就绪的
// 即任务表中一定有任务
// 获取任务
task = TP->get_task();
}
// 走到这里, 释放锁资源, 执行任务
task(info->get_name());
}
return nullptr;
}
// 生产任务, 相当于生产者, 我们想让主线程充当生产者角色
// 生产过程本质上是向任务表生产任务
// 在未来,当我们在向任务表push任务的时候,
// 很有可能出现,(线程池中的) 若干个线程想要从任务表拿任务 (pop),
// 因此,该任务表就是一个被多执行流共享的资源 (临界资源),
// 因此我们必须要保障它的安全问题,因此我们就可以用锁保证它的安全性
void push_task(const T& task)
{
lock_guard lg(&_lock); // RAII风格的锁
_task_que.push(task);
// 当我们将任务生产到任务表中后
// 我们就可以唤醒相应的线程(线程池中的线程)来消费任务
pthread_cond_signal(&_cond);
}
void join()
{
for(const auto& vit : _VPthread)
{
pthread_join(vit->get_tid(), nullptr);
}
}
~thread_pool()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
for(const auto& vit : _VPthread)
{
delete vit;
}
}
private:
size_t _tnum; // 线程个数
std::vector<thread*> _VPthread; // 线程对象表
std::queue<T> _task_que; // 任务表
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
}
#endif
模块三:互斥量的封装 (RAII风格)
#ifndef __LOCKGUARD_HPP_
#define __LOCKGUARD_HPP_
#include <pthread.h>
namespace Xq
{
class mutex
{
public:
mutex(pthread_mutex_t* lock):_lock(lock){}
void lock(void){
pthread_mutex_lock(_lock);
}
void unlock(void){
pthread_mutex_unlock(_lock);
}
private:
pthread_mutex_t* _lock;
};
class lock_guard
{
public:
lock_guard(pthread_mutex_t* lock)
:_mtx(lock)
{
_mtx.lock();
}
~lock_guard()
{
_mtx.unlock();
}
private:
mutex _mtx;
};
}
#endif
模块四:任务的封装
#ifndef __TASK_HPP_
#define __TASK_HPP_
#include <iostream>
#include <functional>
#include <string>
#include "Log.hpp"
namespace Xq
{
class Task
{
public:
Task() {}
Task(int x, int y, std::function<int(int,int)> func)
:_x(x)
,_y(y)
, _func(func)
{}
void operator()(const std::string& name)
{
//std::cout << name << " 正在执行任务: " << _x << " + " << _y << " = " <<
//_func(_x, _y) << std::endl;
LogMessage(NORMAL, "%s %s %d + %d = %d", name.c_str(), "正在执行任务", _x, _y, _func(_x,_y)) ;
}
private:
int _x;
int _y;
std::function<int(int,int)> _func;
};
}
#endif
模块五:日志的封装
void va_start(va_list ap, last_arg);
void va_end(va_list ap)
#include <stdarg.h>
int vprintf(const char *format, va_list ap);
int vfprintf(FILE *stream, const char *format, va_list ap);
int vsprintf(char *str, const char *format, va_list ap);
int vsnprintf(char *str, size_t size, const char *format, va_list ap);
void demo(const char* format, ...)
{
va_list ap;
va_start(ap, format);
vprintf(format,ap);
va_end(ap);
}
int main()
{
// 这里用户可以自定义信息, 与使用printf几乎一致。
demo("%s %f %c %d\n", "haha", 3.14, 'x', 111 );
return 0;
}
#pragma once
#include "Date.hpp"
#include <iostream>
#include <map>
#include <string>
#include <cstdarg>
#define LOG_SIZE 1024
// 日志等级
enum Level
{
DEBUG, // DEBUG信息
NORMAL, // 正常
WARNING, // 警告
ERROR, // 错误
FATAL // 致命
};
void LogMessage(int level, const char* format, ...)
{
// 如果想打印DUBUG信息, 那么需要定义DUBUG_SHOW (命令行定义, -D)
#ifndef DEBUG_SHOW
if(level == DEBUG)
return ;
#endif
std::map<int, std::string> level_map;
level_map[0] = "DEBUG";
level_map[1] = "NORAML";
level_map[2] = "WARNING";
level_map[3] = "ERROR";
level_map[4] = "FATAL";
std::string info;
va_list ap;
va_start(ap, format);
char stdbuffer[LOG_SIZE] = {0}; // 标准部分 (日志等级、日期、时间)
snprintf(stdbuffer, LOG_SIZE, "%s, %s, %s\n", level_map[level].c_str(), Xq::Date().get_date().c_str(), Xq::Time().get_time().c_str());
info += stdbuffer;
char logbuffer[LOG_SIZE] = {0}; // 用户自定义部分
vsnprintf(logbuffer, LOG_SIZE, format, ap);
info += logbuffer;
std::cout << info << std::endl;
va_end(ap);
}
模块六:时间的封装
#ifndef __DATE_HPP_
#define __DATE_HPP_
#include <iostream>
#include <ctime>
namespace Xq
{
class Date
{
public:
Date(size_t year = 1970, size_t month = 1, size_t day = 1)
:_year(year)
,_month(month)
,_day(day)
{}
std::string& get_date()
{
size_t num = get_day();
while(num--)
{
operator++();
}
char buffer[32] = {0};
snprintf(buffer, 32, "%ld/%ld/%ld", _year,_month, _day);
_data = buffer;
return _data;
}
private:
Date& operator++()
{
size_t cur_month_day = month_day[_month];
if((_month == 2) && ((_year % 400 == 0 )|| (_year % 4 == 0 && _year % 100 != 0)))
++cur_month_day;
++_day;
if(_day > cur_month_day)
{
_day = 1;
_month++;
if(_month > 12)
{
_month = 1;
++_year;
}
}
return *this;
}
// 获得从1970.1.1 到 今天相差的天数
size_t get_day()
{
return (time(nullptr) + 8 * 3600) / (24 * 60 * 60);
}
private:
size_t _year;
size_t _month;
size_t _day;
static int month_day[13];
std::string _data;
};
int Date::month_day[13] = {
0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31
};
class Time
{
public:
Time(size_t hour = 0, size_t min = 0, size_t second = 0)
:_hour(hour)
,_min(min)
,_second(second)
{}
std::string& get_time()
{
size_t second = time(nullptr) + 8 * 3600;
_hour = get_hour(second);
_min = get_min(second);
_second = get_second(second);
char buffer[32] = {0};
snprintf(buffer, 32, "%ld:%ld:%ld", _hour, _min, _second);
_time = buffer;
return _time;
}
private:
size_t get_hour(time_t second)
{
// 不足一天的剩余的秒数
size_t verplus_second = second % (24 * 60 * 60);
return verplus_second / (60 * 60);
}
size_t get_min(time_t second)
{
// 不足一小时的秒数
size_t verplus_second = second % (24 * 60 * 60) % (60 * 60);
return verplus_second / 60;
}
size_t get_second(time_t second)
{
// 不足一分钟的秒数
return second % (24 * 60 * 60) % (60 * 60) % 60;
}
private:
size_t _hour;
size_t _min;
size_t _second;
std::string _time;
};
}
#endif
模块六:主函数
#include "ThreadPool.hpp"
int main()
{
//std::cout << "hello thread pool " << std::endl;
srand((unsigned int)time(nullptr) ^ getpid());
Xq::thread_pool<Xq::Task>* TP = new Xq::thread_pool<Xq::Task>();
TP->run_all_thread();
while(true)
{
int x = rand() % 100;
int y = rand() % 50;
TP->push_task(Xq::Task(x, y, [](int x, int y)->int{
return x + y;
}));
// std::cout << "Main thread: " << "生产任务: " << x << " + " << y << " = " << " ? " << std::endl;
LogMessage(NORMAL, "%s%d + %d = ?", "Main thread 生产任务:", x, y);
sleep(1);
}
TP->join();
return 0;
}
模块七: Makefile
TP:TestMain.cc
g++ -o $@ $^ -std=gnu++11 -l pthread
.PHONY:clean
clean:
rm -f TP
2. 设计模式
namespace Xq
{
template<class T>
class thread_pool
{
public:
pthread_mutex_t* get_lock() { // 省略, 跟原来一致 }
bool is_que_empty() { // 省略, 跟原来一致 }
void wait_cond(void) { // 省略, 跟原来一致 }
T get_task(void) { // 省略, 跟原来一致 }
private:
// 将构造私有化, 用于实现单例
// 线程池的构造函数负责实例化线程对象
thread_pool(size_t tnum = TNUM) :_tnum(tnum) { // 省略, 跟原来一致 }
public:
// 因为构造已被私有化,故需要我们显示提供一个可以获取该唯一实例的方法
static thread_pool<T>* get_ptr_only_thread_pool(int num = TNUM)
{
if(_ptr_only_thread_pool == nullptr)
{
_ptr_only_thread_pool = new thread_pool(num);
}
return _ptr_only_thread_pool;
}
// 同时, 我们需要将拷贝构造、赋值都设为私有或者delete
thread_pool(const thread_pool& copy) = delete;
private:
thread_pool& operator=(const thread_pool& copy);
public:
// 按照我们以前懒汉实现单例的方式, 我们可以内嵌一个垃圾回收类
// 回收该唯一实例
class resources_recovery
{
public:
~resources_recovery()
{
if(_ptr_only_thread_pool)
{
delete _ptr_only_thread_pool;
_ptr_only_thread_pool = nullptr;
}
}
static resources_recovery _auto_delete_ptr_only_thread_pool;
};
public:
void run_all_thread(void) { // 省略, 跟原来一致 }
static void* routine(void* arg) { // 省略, 跟原来一致 }
void push_task(const T& task) { // 省略, 跟原来一致 }
void join() { // 省略, 跟原来一致 }
~thread_pool() { // 省略, 跟原来一致 }
private:
std::vector<thread*> _VPthread; // 线程对象表
size_t _tnum; // 线程个数
std::queue<T> _task_que; // 任务表
pthread_mutex_t _lock;
pthread_cond_t _cond;
// 因为构造已被私有化, 故我们需要定义一个静态对象的指针
static thread_pool<T>* _ptr_only_thread_pool;
};
// 初始化该唯一实例
template<class T>
thread_pool<T>* thread_pool<T>::_ptr_only_thread_pool = nullptr;
// 定义一个静态成员变量,进程结束时,系统会自动调用它的析构函数从而释放该单例对象
template<class T>
typename thread_pool<T>::resources_recovery thread_pool<T>::resources_recovery::_auto_delete_ptr_only_thread_pool;
}
#include "ThreadPool.hpp"
int main()
{
srand((unsigned int)time(nullptr) ^ getpid());
// 通过 get_ptr_only_thread_pool 接口获得线程池唯一实例对象
Xq::thread_pool<Xq::Task>* only_target = Xq::thread_pool<Xq::Task>::get_ptr_only_thread_pool();
only_target->run_all_thread();
while(true)
{
int x = rand() % 100;
int y = rand() % 50;
only_target->push_task(Xq::Task(x, y, [](int x, int y)->int{
return x + y;
}));
LogMessage(NORMAL, "%s%d + %d = ?", "Main thread 生产任务:", x, y);
sleep(1);
}
only_target->join();
return 0;
}
static thread_pool<T>* get_ptr_only_thread_pool(int num = TNUM)
{
if (_ptr_only_thread_pool == nullptr)
{
_ptr_only_thread_pool = new thread_pool(num);
}
return _ptr_only_thread_pool;
}
static thread_pool<T>* get_ptr_only_thread_pool(int num = TNUM)
{
{
// 通过加锁保证这个唯一实例的安全性
lock_guard lock(&_only_target_lock);
if (_ptr_only_thread_pool == nullptr)
{
_ptr_only_thread_pool = new thread_pool(num);
}
}
return _ptr_only_thread_pool;
}
static thread_pool<T>* get_ptr_only_thread_pool(int num = TNUM)
{
// 双重检查, 避免不必要的加锁和释放锁
if (_ptr_only_thread_pool == nullptr)
{
lock_guard lock(&_only_target_lock);
if (_ptr_only_thread_pool == nullptr)
{
_ptr_only_thread_pool = new thread_pool(num);
}
}
return _ptr_only_thread_pool;
}
namespace Xq
{
template<class T>
class thread_pool
{
public:
pthread_mutex_t* get_lock() { // 省略, 跟原来一致 }
bool is_que_empty() { // 省略, 跟原来一致 }
void wait_cond(void) { // 省略, 跟原来一致 }
T get_task(void) { // 省略, 跟原来一致 }
private:
// 将构造私有化, 用于实现单例
// 线程池的构造函数负责实例化线程对象
thread_pool(size_t tnum = TNUM) :_tnum(tnum) { // 省略, 跟原来一致 }
public:
// 因为构造已被私有化,故需要我们显示提供一个可以获取该唯一实例的方法
static thread_pool<T>* get_ptr_only_thread_pool(int num = TNUM)
{
// 双重检查锁定模式
if(_ptr_only_thread_pool == nullptr)
{
// 这把锁保证唯一实例
lock_guard lock(&_only_target_lock);
if(_ptr_only_thread_pool == nullptr)
{
_ptr_only_thread_pool = new thread_pool(num);
}
}
return _ptr_only_thread_pool;
}
// 同时, 我们需要将拷贝构造、赋值都设为私有或者delete
thread_pool(const thread_pool& copy) = delete;
private:
thread_pool& operator=(const thread_pool& copy);
public:
// 按照我们以前懒汉实现单例的方式, 我们可以内嵌一个垃圾回收类
// 回收该唯一实例
class resources_recovery
{
public:
~resources_recovery()
{
if(_ptr_only_thread_pool)
{
delete _ptr_only_thread_pool;
_ptr_only_thread_pool = nullptr;
}
}
static resources_recovery _auto_delete_ptr_only_thread_pool;
};
public:
void run_all_thread(void) { // 省略, 跟原来一致 }
static void* routine(void* arg) { // 省略, 跟原来一致 }
void push_task(const T& task) { // 省略, 跟原来一致 }
void join() { // 省略, 跟原来一致 }
~thread_pool() { // 省略, 跟原来一致 }
private:
std::vector<thread*> _VPthread; // 线程对象表
size_t _tnum; // 线程个数
std::queue<T> _task_que; // 任务表
pthread_mutex_t _lock;
pthread_cond_t _cond;
// 因为构造已被私有化, 故我们需要定义一个静态对象的指针
static thread_pool<T>* _ptr_only_thread_pool;
// 这把锁保证获取唯一实例的安全性
static pthread_mutex_t _only_target_lock;
};
// 初始化该唯一实例
template<class T>
thread_pool<T>* thread_pool<T>::_ptr_only_thread_pool = nullptr;
// 定义一个静态成员变量,进程结束时,系统会自动调用它的析构函数从而释放该单例对象
template<class T>
typename thread_pool<T>::resources_recovery thread_pool<T>::resources_recovery::_auto_delete_ptr_only_thread_pool;
// 初始化这把静态锁 (保证获取唯一实例的安全)
template<class T>
pthread_mutex_t thread_pool<T>::_only_target_lock = PTHREAD_MUTEX_INITIALIZER;
}
3. STL, 智能指针和线程安全
3.1. STL是否是线程安全的?
3.2. 智能指针是否是线程安全的?
4. 其它常见的锁
4.1. 自旋 && 自旋锁 --- spin lock
场景一:
场景二:
4.2. 自旋锁的接口介绍:
初始化自旋锁:
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
销毁自旋锁,释放自旋锁资源:
int pthread_spin_destroy(pthread_spinlock_t *lock);
加锁操作:
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
解锁操作,释放自动锁:
int pthread_spin_unlock(pthread_spinlock_t *lock);
4.3. 自旋锁的使用
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#define PTHREAD_NUM 3
#define BUFFER_SIZE 32
int ticket = 1000;
class PTHREAD_INFO
{
public:
PTHREAD_INFO(const std::string& name, pthread_spinlock_t* Plock)
:_name(name)
,_Plock(Plock)
{}
public:
std::string _name;
pthread_spinlock_t* _Plock;
};
void* GetTicket(void* arg)
{
PTHREAD_INFO* info = static_cast<PTHREAD_INFO*>(arg);
while(true)
{
pthread_spin_lock(info->_Plock);
if(ticket > 0)
{
usleep(1000);
std::cout << info->_name << " get a ticket " << ticket << std::endl;
ticket--;
pthread_spin_unlock(info->_Plock);
}
else
{
pthread_spin_unlock(info->_Plock);
break;
}
usleep(rand() % 500);
}
delete info;
return nullptr;
}
void Test1(void)
{
pthread_t tid[PTHREAD_NUM];
// 定义局部自旋锁
pthread_spinlock_t myspinlock;
// 初始化局部自旋锁, 且线程间共享
pthread_spin_init(&myspinlock, PTHREAD_PROCESS_PRIVATE);
char buffer[BUFFER_SIZE] = {0};
for(size_t i = 0; i < PTHREAD_NUM; ++i)
{
snprintf(buffer, BUFFER_SIZE, "%s-%ld", "thread", i + 1);
PTHREAD_INFO* info = new PTHREAD_INFO(buffer, &myspinlock);
pthread_create(tid + i, nullptr, GetTicket, static_cast<void*>(info));
}
for(size_t i = 0; i < PTHREAD_NUM; ++i)
{
pthread_join(tid[i], nullptr);
}
// 局部自旋锁需要我们手动释放
pthread_spin_destroy(&myspinlock);
}
int main()
{
srand((size_t)time(nullptr));
Test1();
return 0;
}
4.2. 读写锁
4.2.1. 读写锁的接口:
// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
// 释放读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
//加锁:
// 读者加锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
// 写者加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
// 解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);