linux线程 | 同步与互斥 | 线程池以及知识点补充-LMLPHP

目录

 线程池

 什么是线程池

线程池的应用场景

代码实现 

准备文件

makefile

Task.h

ThreadPool.h

main.cpp

运行结果

单例模式

常见的锁

自旋锁

自旋锁接口

读者写者问题 

概念

 接口

理解


 线程池

 什么是线程池

线程池的应用场景

代码实现 

准备文件

linux线程 | 同步与互斥 | 线程池以及知识点补充-LMLPHP

makefile

main.exe:main.cpp
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -rf main.exe

Task.h

#include <iostream>
using namespace std;
#include <vector>
#include <string>

//加减乘除
string opers = "+-*/%";



//Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:
    //构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算
    Task(int x, int y, char op)
        : data1_(x), data2_(y), op_(op)
    {}

    ~Task() {}

    //执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。 
    void run()
    {
        switch (op_)
        {
        case '+':
            cout << data1_ << "+" << data2_ << "=" << data1_ + data2_ << endl;
            break;
        case '-':
            cout << data1_ << "-" << data2_ << "=" << data1_ - data2_ << endl;
            break;
        case '*':
            cout << data1_ << "*" << data2_ << "=" << data1_ * data2_ << endl;
            break;
        case '/':
            if (data2_ == 0)
            {
                cout << "error, " << data1_ << '/' << data2_ << " is error!" << endl;
            }
            else
            {
                cout << data1_ << "/" << data2_ << "=" << data1_ / data2_ << endl;
            }
            break;
        case '%':
            if (data2_ == 0)
            {
                cout << "error, " << data1_ << '%' << data2_ << " is error!" << endl;
            }
            else
            {
                cout << data1_ << "%" << data2_ << "=" << data1_ % data2_ << endl;
            }
            break;
        default:
            cout << "default" << endl;
            break;
        }
    }

    //仿函数, 为了方便我们的对象能够像函数一样使用。 
    void operator()()
    {
        run();
    }

private:
    //每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_
    int data1_;
    int data2_;

    char op_;
};

ThreadPool.h


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>

//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{
    pthread_t tid_;
    string name_;   
};


template<class T>
class ThreadPool
{
    static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)

private:
    //加锁解锁
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

    //唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }

public:

    //线程要执行的函数
    static void* HandlerTask(void* args)
    {
        ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
        while(true) 
        {
            tp->Lock();
            while (tp->tasks_.empty())
            {
                tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。
            }
            //否则就去拿到tasks里面的任务
            T t = tp->tasks_.front();
            tp->tasks_.pop();
            
            //
            tp->Unlock();
            t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    

        }
    }

    //运行这个线程池, 也就是先将线程创建出来。 然后去运行线程
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name_ = "thread-";
            threads_[i].name_ += to_string(i);   
            pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
        }
    }

    //主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理
    void Push(const T& t)
    {
        Lock();
        tasks_.push(t);
        Wakeup();

        Unlock();
    }

    //线程池地初始化, 就是将锁和条件变量都初始化一下
    ThreadPool(int num = defaultnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }

    //析构
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }

private:
    vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,
    queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 

    pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 
    pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 

};

main.cpp

#include"ThreadPool.h"
#include"Task.h"

int main()
{
    //运行线程池
    ThreadPool<Task>* tp = new ThreadPool<Task>();
    tp->Start();    

    srand(time(nullptr) ^ getpid());
    while (true)
    {
        //构建任务  
        int x = rand() % 10 + 1;
        usleep(10);
        int y = rand() % 5;
        char op = opers[rand()% opers.size()];
        Task t(x, y, op);
        
        //交给线程池处理
        //主线程给线程池发送任务, 其实就相当于主线程时生产者。 
        tp->Push(t);



        sleep(1);
    }
    return 0;
}

运行结果

linux线程 | 同步与互斥 | 线程池以及知识点补充-LMLPHP

单例模式

        利用单例模式来创建线程池

主要改动就是ThreadPool.h


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>

//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{
    pthread_t tid_;
    string name_;   
};


template<class T>
class ThreadPool
{
    static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)

private:
    //加锁解锁
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }

    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }

    //唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }



public:

    //线程要执行的函数
    static void* HandlerTask(void* args)
    {
        ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
        while(true) 
        {
            tp->Lock();
            while (tp->tasks_.empty())
            {
                tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。
            }
            //否则就去拿到tasks里面的任务
            T t = tp->tasks_.front();
            tp->tasks_.pop();
            
            //
            tp->Unlock();
            t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    

        }
    }

    //运行这个线程池, 也就是先将线程创建出来。 然后去运行线程
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name_ = "thread-";
            threads_[i].name_ += to_string(i);   
            pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
        }
    }

    //主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理
    void Push(const T& t)
    {
        Lock();
        tasks_.push(t);
        Wakeup();

        Unlock();
    }

    //线程池地初始化, 就是将锁和条件变量都初始化一下
    ThreadPool(int num = defaultnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }

    //析构
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }

    //获取单例
    //改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么
    //要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。 
    //解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够
    //进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连
    //第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!
    static ThreadPool<T>* GetInstance(int num = defaultnum)
    {
        if (tp_ == nullptr)
        {
            pthread_mutex__lock(&lock_);
            
            if (tp_ == nullptr) 
            {
                tp_ = new ThreadPool<T>(num);
            }

            pthread_mutex_unlock(&lock_);
        }
        
        return tp_;
    }



private:
    //构造函数私有化, 只有Getinstance里面才能创建。 
    ThreadPool(int num = defaultnum)
        :threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }

    //单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。 
    ThreadPool(const ThreadPool<T>& tp) = delete;
    const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;

    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }


private:
    vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,
    queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 

    pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 

    pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 

    

    pthread_mutex_t lock_;         //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。
    static ThreadPool<T>* tp_;    //tp指针, 这就是唯一个单例对象。 

};

template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;    

 运行结果和上次一样:

linux线程 | 同步与互斥 | 线程池以及知识点补充-LMLPHP

常见的锁

自旋锁

在讲自旋锁之前, 博主要将在讲解进程等待的时候的故事再重新讲一下:       

        在上面的故事当中, 我们把小明当成临界区代码, 那么小王在楼下等小明和去网吧等小明取决于什么? 是不是就是取决于小明的时常? 我们前面一直都在研究临界区的线程安全问题。 但是我们几乎没有研究过线程在临界区待得时间的长短的问题。 等待什么呢? 等待刚刚进去的线程什么时候出来。 那么, 我们如果一个线程在临界区内待得时间非常长。 那么其他现车给在申请锁之后最好的做法是不是就是挂起? 如果一个线程在临界区待得时间非常短, 我们此时可以选择让其他线程在此时不要选择挂起, 而是处于一种自旋状态。 而我们的小王每十秒就打个电话的过程其实就是自选的过程。 所以我么以前学习的所有锁, 全部都是挂起等待锁。 一个线程进入, 其他的线程竞争失败, 失败的时候他们就把自己挂起等待了。 挂起等待要不要花时间? 挂起的时候就是将我们的执行流放到我们的等待队列里面, 然后唤醒的时候又把它们从等待队列里面拆下来放到cpu里面去执行。 这个过程来回对数据结构做迁移, 是要花时间的。 ——所以, 什么是自旋, 自旋就是不把自己挂起, 而是由线程不断地去周而复始的去申请锁。 如果申请锁成功则进入, 失败则返回重新检测锁的状态。 

自旋锁接口

        其实我们可以自己实现自旋锁, 就是使用trylock, trylock就是尝试检测这个锁, 如果这个锁没有申请成功, 就出错返回。 但是lock就是直接挂起, 所以我们想要实现自旋锁就可以使用while循环然后trylock。 

        也可以使用系统给我们提供的接口

linux线程 | 同步与互斥 | 线程池以及知识点补充-LMLPHP

        可以自己使用spin_lock, 这个函数就会对一个锁返回申请, 直到申请成功。 spin_trylock就是和mutex_trylock一样,都是申请失败了就直接返回, 想要自选需要自己加。

读者写者问题 

概念

        生活中有哪些场景是读者写者问题呢?

        我们在初中小学画的板报、作家写的小说、甚至博主写的这篇博客都是读者写者的问题。 就比如我们写一篇博客, 我们写博客的人就是写者, 然后读这篇博客的人就是读者。 

        对于读者写者问题来说, 也是要遵守321原则——三种关系、两种角色、一个交易场所。

 接口

理解

        一般而言, 读多写少。 如果有一个写者在写, 任何读者都不能进来。 而一个读者正在读, 任何写者都不能进来。  所以, 正常来讲, 读者争锁的能力比写者要强。 所以在我们的读者写者当中, 我们往往会出现读者很多, 而写者很少的情况。 所以竞争锁的时候我们的读者竞争到锁的概率是非常的高的, 进而导致写者长时间得不到锁而产生饥饿问题。

        饥饿问题是一个中性的现象还是一个偏向编译的现象呢? 在读者写者模型这里是一个比较偏向中性的词语。 因为他就是一个事实, 因为读者本来就多, 读者本来就多写者本来就少, 所以就注定了根据这种场景是默认的现象。 那么这种默认的行为我们叫做读者优先。 

现在我们实现一下读者优先的伪代码

首先就是读者的加锁和解锁操作:
lock(&rlock);
reader_count++;
if (reader_count == 1) lock(&wlock);  //reader_count == 1说明这是第一个读者, 从这里开始就让读者进不来!!!
unlock(&rlock);

//读者进来了之后, 就进行读取写者的数据
//都读取完成之后就离开
lock(&rlock);
reader_count--;
if (reader_count == 0) unlock(wlock); //如果是最后一个读者, 就让写者进来,就可以写数据了。 
unlock(&rlock);

然后是写者的加锁和解锁,写者的比较简单,因为写者少, 竞争锁的能力弱
lock(&wlock);

//写入

unlock(&wlock);

——————以上就是本节全部内容哦, 如果对友友们有帮助的话可以关注博主, 方便学习更多知识哦!!!   

10-26 06:33