【Linux取经路】线程周边——线程池、线程安全的单例模式、读写者问题-LMLPHP

一、线程池

// ThreadPool.hpp
#pragma once

#include <pthread.h>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include <unordered_map>

struct ThreadInfo
{
    pthread_t tid_; // 线程 ID
    std::string name_; // 线程的名字
};

template <class T>
class ThreadPool
{
    static const int defaultnum = 5;
public:
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

    void Weakup()
    {
        pthread_cond_signal(&cond_);
    }

    void Sleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }

    bool IsTaskQueueEmpty()
    {
        return tasks_.empty();
    }

    T PopTasks()
    {
        T task = tasks_.front();
        tasks_.pop();
        return task;
    }

    const std::string &GetThreadName(pthread_t tid)
    {
        return um_[tid];
    }
public:
    ThreadPool(int thread_num = defaultnum)
    :threads_(thread_num), thread_num_(thread_num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);    
    }

    static void *Routine(void *args) 
    {
        ThreadPool *tp = static_cast<ThreadPool*>(args);
        std::string name = tp->GetThreadName(pthread_self());
        while(true)
        {
            tp->Lock();
            while(tp->IsTaskQueueEmpty())
            {
                tp->Sleep();
            }

            T task = tp->PopTasks();
            tp->Unlock();
            task.run(); // 此时这个任务已经属于该线程私有的了,所以对任务的处理工作可以在解锁之后进行
            printf("%s is running----%s\n", name.c_str(), task.result_to_string().c_str());
        }
    }

    void start()
    {
        for(int i = 0; i < thread_num_; i++)
        {
            threads_[i].name_ = "Thread-" + std::to_string(i);
            pthread_create(&(threads_[i].tid_), nullptr, Routine, this);
            um_[threads_[i].tid_] = threads_[i].name_;
        }
    }

    void push(const T& task)
    {
        Lock();
        tasks_.push(task);
        Weakup();
        Unlock();
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }
private:
    std::vector<ThreadInfo> threads_; // 一批线程
    int thread_num_; // 线程池中的线程的数量
    std::queue<T> tasks_; // 任务
    pthread_mutex_t mutex_; // 定义一把锁,让所有线程保持互斥
    pthread_cond_t cond_; // 定义一个条件变量,让线程同步
    std::unordered_map<pthread_t, std::string> um_; // 一个 map 用来快速查找一个线程的name
};

注意:Routine 方法如果要写在函数内部,那么一定得是 static 的。因为 pthread_creat 函数的参数要求 Routine 必须是 void *(*)(void *) 类型。而非 static 的成员函数其第一个参数是隐藏的 this 指针,不符合要求。将 Routine 设置为 static,此时又会有新的问题产生,就是,在静态成员函数中无法访问到非静态成员,为了解决这个问题,我们将 this 指针做为线程函数的参数传递过去,此时就可以在 Routine 函数中去调用非静态的成员。

// main.cpp
#include "ThreadPool.hpp"
#include "Task.h"
#include <unistd.h>
#include <iostream>

using namespace std;

const std::string opers = "+-*/%";

int main()
{
    srand((unsigned int)time(nullptr));
    ThreadPool<Task> *tp = new ThreadPool<Task>(5);
    tp->start();
    int len = opers.size();
    while(true)
    {
        //1. 构建任务
        // 模拟获取数据
        int data1 = rand() % 10 + 1; // [1, 10]
        usleep(10);

        int data2 = rand() % 13; // [0, 13]
        usleep(10);

        char op = opers[rand() % len];

        Task task(data1, data2, op);
        //2. 交给线程池处理
        printf("main thread push a task: %s\n", task.get_task().c_str());
        tp->push(task);

        usleep(100000);
    }
    return 0;
}

【Linux取经路】线程周边——线程池、线程安全的单例模式、读写者问题-LMLPHP

二、封装线程

// Thread.hpp
#pragma once
#include <pthread.h>
#include <string>

typedef void (*callback_t)();

class Thread
{
public:
    static void *Routine(void *args)
    {
        Thread *th = static_cast<Thread*>(args);
        th->Entery();
    }

    Thread(callback_t func)
    :tid_(0), name_(""), start_time_(0), isrunning_(false), func_(func)
    {}

    void Run()
    {
        name_ = "Thread-"+std::to_string(num);
        start_time_ = time(nullptr);
        isrunning_ = true;
        pthread_create(&tid_, nullptr, Routine, this);
    }

    void Join()
    {
        pthread_join(tid_, nullptr);
        isrunning_ = false;
    }

    pthread_t get_tid()
    {
        return tid_;
    }

    std::string get_name()
    {
        return name_;
    }

    uint64_t get_start_time()
    {
        return start_time_;
    }

    bool is_running()
    {
        return isrunning_;
    }

    void Entery()
    {
        func_();
    }

    ~Thread()
    {}
private:
    pthread_t tid_;
    std::string name_;
    uint64_t start_time_; // 线程启动时间
    bool isrunning_;

    callback_t func_; // 线程将来要执行的函数, 回调函数
    static int num; // 线程编号
};

int Thread::num = 1;

线程封装的好处在于,可以像使用对象一样去使用线程,如下:

// main.cc
#include "Thread.hpp"
#include <cstdio>
#include <unistd.h>
#include <iostream>
#include <cstdio>
#include <vector>

using namespace std;

void Print()
{
    while(true)
    {
        printf("Hello Linux\n");
        sleep(1);
    }
}

int main()
{
    vector<Thread> threads;

    for(int i = 0; i < 5; i++)
    {
        threads.push_back(Thread(Print));
    }

    for(auto &thread : threads)
    {
        thread.Run();
    }

    for(auto &thread : threads)
    {
        thread.Join();
    }


    return 0;
}

【Linux取经路】线程周边——线程池、线程安全的单例模式、读写者问题-LMLPHP

三、STL,智能指针和线程安全

3.1 STL 中的容器是否是线程安全的?

不是,因为 STL 设计的初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。而且对于不同的容器,加锁方式不同,性能可能也不同(例如 hash 表的锁表和锁桶)。因此 STL 默认不是线程安全的。如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。

3.2 智能指针是否是线程安全的?

对于 unique_ptr,由于只是在当前代码范围内生效,因此不涉及线程安全问题。对于 shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题。但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证 shared_ptr 能够高效,原子的操作引用计数。

四、线程安全的单例模式

4.1 什么是单例模式?

单例模式是一种“经典的,常用的“设计模式。所谓设计模式就是,对一些经典的常见场景,给定了一些应对的解决方案,这个就是设计模式**。某些类,只应该具有一个对象(实例),就称之为单例**,在很多服务器开发场景中,经常需要让服务器加载很多数据(上百G)到内存中,此时往往要用一个单例的类来管理这些数据。

4.2 饿汉和懒汉

4.2.1 饿汉方式实现单例

饿汉方式:就是吃完饭,立刻洗完,因为下一顿吃的时候可以立刻拿着碗就能吃饭。

template <typename T>
class Singleton 
{
	static T data;
public:
	static T* GetInstance() 
    {
		return &data;
	}
};

小Tips:全局变量和静态变量,在程序加载的时候就已经被创建好了,局部对象实在程序运行过程中进行加载的。

饿汉模式,在 Singleton 类中定义了一个 T 类型的成员静态成员变量,因为是静态的,无论创建多少个 Singleton 对象,最终都会只有一个 T 类型的对象,并且因为是静态的,在程序被加载的时候就已经被创建出来了,后面在需要的时候直接使用即可。

4.2.2 懒汉方式实现单例

懒汉方式:就是吃完饭,先把碗放下,然后下一顿饭用到这个碗了再洗碗,这就是懒汉方式。懒汉方式最核心的思想是 “延时加载”,从能够优化服务器的启动速度。其实从整个过程来看,懒汉模式并不会提高太多效率,因为最终该加载的东西还是需要花时间去进行加载,饿汉模式采用立即加载的方式,前期可能会慢一点,而懒汉模式前期只是去申请资源,并没有实际的去加载资源,而是等待需要的时候再进行加载,所以说,懒汉方式可以优化启动速度。

template <typename T>
class Singleton 
{
	static T* inst;
public:
    static T* GetInstance() 
    {
        if (inst == NULL) 
        {
        	inst = new T();
        }
    	return inst;
    }
};

懒汉模式中,Singeton 类中是一个静态的指针,因为是指针,所以在程序加载的时候并不会立即就创建出一个 T 类型的对象,而是在需要的时候,调用 GetInstance 创建对象。但是这样做存在线程安全问题,第一次调用 Getlnstance 的时候,如果两个线程同时调用,可能会创建出两份 T 对象的实例,但是后续再次调用,就没有问题了。

4.3 基于懒汉方式实现的单例线程池

#pragma once

#include <pthread.h>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include <unordered_map>

struct ThreadInfo
{
    pthread_t tid_;    // 线程 ID
    std::string name_; // 线程的名字
};

template <class T>
class ThreadPool
{
    static const int defaultnum = 5;

public:
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

    void Weakup()
    {
        pthread_cond_signal(&cond_);
    }

    void Sleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }

    bool IsTaskQueueEmpty()
    {
        return tasks_.empty();
    }

    T PopTasks()
    {
        T task = tasks_.front();
        tasks_.pop();
        return task;
    }

    const std::string &GetThreadName(pthread_t tid)
    {
        return um_[tid];
    }

public:
    static void *Routine(void *args)
    {
        ThreadPool *tp = static_cast<ThreadPool *>(args);
        std::string name = tp->GetThreadName(pthread_self());
        while (true)
        {
            tp->Lock();
            while (tp->IsTaskQueueEmpty())
            {
                tp->Sleep();
            }

            T task = tp->PopTasks();
            tp->Unlock();
            task.run(); // 此时这个任务已经属于该线程私有的了,所以对任务的处理工作可以在解锁之后进行
            printf("%s is running----%s\n", name.c_str(), task.result_to_string().c_str());
        }
    }

    void start()
    {
        for (int i = 0; i < thread_num_; i++)
        {
            threads_[i].name_ = "Thread-" + std::to_string(i);
            pthread_create(&(threads_[i].tid_), nullptr, Routine, this);
            um_[threads_[i].tid_] = threads_[i].name_;
        }
    }

    void push(const T &task)
    {
        Lock();
        tasks_.push(task);
        Weakup();
        Unlock();
    }

    static ThreadPool<T> *GetInstance() // 指正通过该接口获取一个单例对象
    {
        if (ptp_ == nullptr)
        {
            pthread_mutex_lock(&smutex_);
            if (ptp_ == nullptr)
            {
                printf("log: singleton creat done first!\n");
                ptp_ = new ThreadPool<T>();
            }
            pthread_mutex_unlock(&smutex_);
        }
        return ptp_;
    }

private:
    ThreadPool(int thread_num = defaultnum)
        : threads_(thread_num), thread_num_(thread_num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }
    ThreadPool(const ThreadPool<T> &tp) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T> &tp) = delete;

private:
    std::vector<ThreadInfo> threads_;               // 一批线程
    int thread_num_;                                // 线程池中的线程的数量
    std::queue<T> tasks_;                           // 任务
    pthread_mutex_t mutex_;                         // 定义一把锁,让所有线程保持互斥
    pthread_cond_t cond_;                           // 定义一个条件变量,让线程同步
    std::unordered_map<pthread_t, std::string> um_; // 一个 map 用来快速查找一个线程的name

    static ThreadPool<T> *ptp_;     // 单例
    static pthread_mutex_t smutex_; // 定义一把静态的锁
};

template <class T>
ThreadPool<T> *ThreadPool<T>::ptp_ = nullptr;

template <class T>
pthread_mutex_t ThreadPool<T>::smutex_ = PTHREAD_MUTEX_INITIALIZER;

相较于上面第九节中的普通版线程池,这一版基于懒汉方式实现的单例线程池,主要改动的地方在:增加一个 static static ThreadPool<T> * 类型的指针,指向一个线程池;将所有可能创建对象的成员函数私有,这样就无法在类外直接创建线程池对象,将拷贝构造和运算符重载直接禁掉;提供一个静态的成员方法,供外部来获取一个单例的进程池对象。

在最开始这个单例对象还没被创建的时候,可能存在多个线程同时去获取,此时可能有多个线程判断 ptp_ == nullptr 都成立,所以可能会创建出多份线程池对象,所以在判断前需要先加锁,因为需要再静态成员函数里面使用锁,所以我们需要定义一个静态的锁。但是,仔细想想,只有在对象没有被创建出来的时候,才可能存在多个线程同时去获取,可能会创建多个对象,只要该对象被创建出来了,之后无论同时再来多少线程都不会去创建对象,但是因为加锁了,所以后面来的多个线程都只能去串行的申请线程池,此时再看加锁操作,仿佛就显得有点多此一举,因此我们在加锁前可以再进行以判断,看 ptp_ 是否等于空,只有当其等于空的时候,再进行加锁,这样做级既可以保证不会创建出多个线程池对象,也不会因为加锁导致效率下降。

线程池使用:

#include "ThreadPool.hpp"
#include "Task.h"
#include <unistd.h>
#include <iostream>

using namespace std;

const std::string opers = "+-*/%";

int main()
{
    printf("main thread is start!...\n");
    sleep(3);
    srand((unsigned int)time(nullptr));
    // ThreadPool<Task> *tp = ThreadPool<Task>::GetInstance(); // 获取单例对象
    ThreadPool<Task>::GetInstance()->start(); // 获取一个单例对象
    int len = opers.size();
    while(true)
    {
        //1. 构建任务
        // 模拟获取数据
        int data1 = rand() % 10 + 1; // [1, 10]
        usleep(10);

        int data2 = rand() % 13; // [0, 13]
        usleep(10);

        char op = opers[rand() % len];

        Task task(data1, data2, op);
        //2. 交给线程池处理
        printf("main thread push a task: %s\n", task.get_task().c_str());
        ThreadPool<Task>::GetInstance()->push(task);

        usleep(1000000);
    }
    return 0;
}

【Linux取经路】线程周边——线程池、线程安全的单例模式、读写者问题-LMLPHP

五、其他常见的各种锁

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁、写锁、行锁等),当其它线程想要访问数据时,被阻塞挂起。
  • 乐观锁: 每次取数据的时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他线程在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和 CAS 操作。
  • CAS 操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁:当一个进程申请锁失败时,不是将自己挂起,而是继续去申请锁。使用这种锁的前提是线程在临界区中执行的时间要足够的短。可以通过 pthread_mutex_trylock 配合 while 循环实现一个自旋锁。pthread 库也为我们提供了自旋锁 pthread_spinlock_t

六、读者写者问题

6.1 读写锁

在多线程场景下,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的过程,中间耗时很长,给这种代码段加锁,会极大的降低程序效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢?答案是有,采用读写锁。

【Linux取经路】线程周边——线程池、线程安全的单例模式、读写者问题-LMLPHP

pthread 库也为我们准备了读写锁 pthread_rwlock_t,其接口和互斥量的一样,只不过加锁的接口有所不同,对读者来说,加锁使用 pthread_rwlock_rdlock 接口,对写者来说,使用 pthread_rwlock_wrlock 进行加锁,读写者释放锁都采用同一个接口 pthread_rwlock_unlock 进行释放。

读写者模型的同步策略:采用读写锁的前提是,存在非常多的读者,而只有少量的写者,这就必定会导致大部分时间都是读者在执行相应的代码,写者处于一种饥饿状态,这种情况也叫做读者优先。我们可以通过设置写者优先同步策略来解决这种问题,当读者和写者都在等待锁的时候,当锁被释放的时候,让写者拿到锁。

【Linux取经路】线程周边——线程池、线程安全的单例模式、读写者问题-LMLPHP

七、结语

今天的分享到这里就结束啦!如果觉得文章还不错的话,可以三连支持一下,春人的主页还有很多有趣的文章,欢迎小伙伴们前去点评,您的支持就是春人前进的动力!

【Linux取经路】线程周边——线程池、线程安全的单例模式、读写者问题-LMLPHP

05-26 00:14