【Linux】线程池项目详解-LMLPHP

回避、躲闪、辗转腾挪都毫无作用,
既然来的总是要来,
迎着刀锋而上恐怕是最好的选择,
起码节约时间。
--- 廖一梅 《像我这样笨拙地生活》---

1 线程基础

线程我们已经学习的差不多了,从线程的概念:

线程的管理是在共享区完成的,编译时,动态链接线程库,映射在地址空间的共享区中。在这个共享区中储存着线程的属性内存块(包含线程单独的栈结构),通过tid我们可以找到线程的所有属性。每个线程都对应一个LWP的pid,这是系统层线程调度的单位!

需要特别注意的是线程互斥的场景,在多线程的场景下,对于全局资源的处理有且只能用一个线程进行操作,否则就会出现意想不到的后果!对于多线程的场景使用互斥锁来对全局资源进行保护,可以通过RAII规则的锁守卫完成只能加减锁!

2 什么是线程池

之前我们实现过进程池
进程池就是通过预先创建若干个进程与管道,在需要进行任务时,选择一个进程,通过管道发送信息,让其完成工作。不同的进程因为不能共享地址空间,所以想要协同工作就需要进行进程间通信,这里使用管道来实现进程
间的通信。而对于线程池来说,多线程之间是共享地址空间的,所以不需要进行额外的通信,直接调用线程来执行任务就可以!

线程池完成的工作就是在程序运行时,自动创建出若干个线程等待主线程发送任务进行执行,这样不再需要每次再创建线程来完成一个任务,只需要向任务队列中压入任务,线程池就会自动唤醒一个线程来执行任务,执行完就会继续等待任务的到来!
【Linux】线程池项目详解-LMLPHP

线程池的应用场景:

3 线程池工作原理

线程池的关键部分可以分为:

  1. 线程容器:用来管理创建的线程,方便统一初始化。
  2. 任务队列:用来储存任务消息,需要支持压入与取出的操作。
  3. 线程函数:线程都需要执行这个函数模块,在这个函数模块中进行任务的等待和执行。
  4. 线程唤醒机制:需要一个线程换取机制,通过条件变量个互斥锁完成对线程的保护与唤醒。
  5. 单例模式:线程池不需要创建多个,一个程序只需要一个线程池,通过单例模式进行优化。

这样,通过主线程对线程池中进行的入队列操作就可以传入任务,然后线程池中会自动检测队列中是否有任务,有任务就调用休眠的线程来执行任务。

4 构建线程池

4.1 框架搭建

首先针对线程池的关键组件进行一个框架的构建:

线程池的成员变量:

为了使用线程方便,我们直接使用之前实现的线程类!

然后我们还需要一下功能函数来支持主线程传入任务,主线程停止工作,线程池读取任务,线程池删除旧任务:

功能函数:

为了方便进行加减锁的操作,我们可以完成一些线程池内部函数:

内部函数
主要框架
#pragma once

#include "Thread.hpp"
#include <vector>
#include <queue>
#include <string>
#include "Log.hpp"

using namespace ThreadMouble;
using namespace log_ns;

const int default_num = 5;

// 测试代码
void test()
{
    std::cout << "这是一个测试程序!" << std::endl;
}

template <class T>
class ThreadPool
{
private:
    // 加锁 解锁
    void LockQueue()
    {
        pthread_mutex_lock(&_mtx);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mtx);
    }
    // 休眠等待
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mtx);
    }
    // 唤醒一个线程
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
    // 全部唤醒
    void WakeUpAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    // 队列为空
    bool IsEmpty()
    {
        return _tasks.empty();
    }
    void HandlerTask(std::string &name)
    {
    }
public:
    ThreadPool(int num = default_num) : _thread_num(num), _sleep_num(0), _isrunning(false)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
public:
    static ThreadPool<T> *GetInstance()
    { 
    }
    // 初始化
    void Init() 
    {
    }
    // 开始运行
    void Start() 
    {
    }
    // 停止运行
    void Stop()
    { 
    }
    // 加入任务
    void Equeue(T &in)
    {
      
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

private:
    // 线程容器
    std::vector<Thread> _threads;
    // 任务队列
    std::queue<T> _tasks;
    // 线程数量
    int _thread_num;
    // 休眠数量
    int _sleep_num;
    // 运行判断
    bool _isrunning;
    // 加锁保护队列
    pthread_mutex_t _mtx;
    // 条件变量
    pthread_cond_t _cond;

};

4.3 HandlerTask函数

我们首先先来完成每个线程创建的新线程都会进行的函数:

  1. 首先这个函数需要不断的执行,所以使用while(true)使其不断地轮询
  2. 然后就是对队列任务的读取,如果队列为空并且线程池还在运行,那么就进入进行等待条件变量唤醒,需要注意的是休眠数需要进行处理
  3. 如果队列为空了,并且停止运行了,就直接退出!退出前进行解锁!
  4. 如果队列不为空,并且还在运行,那么就从队列中取出一个任务进行执行!
void HandlerTask(std::string &name)
    {
        // 运行任务
        while (true)
        {
            LockQueue();
            // 队列为空并且正在运行
            while (IsEmpty() && _isrunning)
            {
                // 进行阻塞
                _sleep_num++;
                LOG(INFO, "%s sleep begin!\n", name.c_str());
                Sleep();
                LOG(INFO, "%s wakeup!\n", name.c_str());
                _sleep_num--;
            }

            // 如果队列为空 停止运行了
            if (IsEmpty() && !_isrunning)
            {
                // 直接解锁退出
                UnlockQueue();
                // std::cout << name << " stop 退出!" << std::endl;
                LOG(INFO, "%s quit !\n", name.c_str());
                break;
            }

            // 取出一个任务
            T t = _tasks.front();
            _tasks.pop();
            // 解锁
            UnlockQueue();
            // 临界区之外执行任务 
            t();
            // std::cout << name << " " << t.result() << std::endl;
            LOG(DEBUG, "HandlerTask Done, task is : %s\n", t.result().c_str());
        }
    }

完成!

4.3 基础函数

我们先来实现初始化init , 开始运行 start ,停止运行stop,加入任务

  1. 初始化:首先就是创建若干个线程,再将创建的线程存入线程容器中。

    // 初始化
    void Init() 
    {
        // 进行绑定
        func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _thread_num; i++)
        {
            std::string name = "thread-" + std::to_string(i + 1);
            // std::cout << name << " init!" << std::endl;
            //_threads.emplace_back(name, test); //测试
            _threads.emplace_back(name, func);
            LOG(DEBUG, "construct thread : %s done , init success\n", name.c_str());
        }
    }
    
  2. 开始运行:直接遍历一遍进行开始运行即可!每个线程都来执行HandlerTask

    // 开始运行
      	  void Start() 
        {
            _isrunning = true;
            for (auto &e : _threads)
            {
                // std::cout << e.getname() << " start!" << std::endl;
                LOG(DEBUG, "start thread %s done\n", e.Name().c_str());
                e.Start();
            }
    
  3. 停止运行:唤醒所有休眠的线程,并将判断符设置为false

       // 停止运行
       void Stop()
       {
           LockQueue();
           // std::cout << "void stop()" << std::endl;
           WakeUpAll();
           _isrunning = false;
           UnlockQueue();
           LOG(INFO, "ThreadPool Stop success\n");
       }
    
  4. 加入任务: 这里会对全局变量进行操作,所以先上锁。在线程池还在运行时才可以进程任务的插入,插入后,如果有休眠的线程就唤醒一个休眠的线程来执行任务!

    // 加入任务
    void Equeue(T &in)
    {
        // 临界区操作需要加锁
        LockQueue();
        // 只有线程池运行才可以进行插入
        if (_isrunning)
        {
            _tasks.push(in);
            // std::cout << "加入任务 : " << in.debug() << std::endl;
            LOG(INFO, "push task : %s\n", in.debug().c_str());
            // 唤醒一个线程
            if (_sleep_num > 0)
                WakeUp();
        }
    
        UnlockQueue();
    }
    

4.4 单例模式改造

单例模式之前的文章有介绍过:设计模式 — 单例模式

接下来我们通过懒汉模式进行优化:

  1. 首先我们要做的就是将构造函数私有化,让类外部不能够创建对象,并且封锁赋值重载和拷贝构造!
  2. 然后类内需要一个静态类对象指针,并且使用单独一个全局锁进行保护
  3. 完成一个获取唯一类对象指针的函数方法getinstance
static ThreadPool<T> *GetInstance()
    {
        if (_tp == nullptr)
        {
            LockGuard lock(&_sig_mtx);
            if (_tp == nullptr)
            {
                LOG(INFO, "create threadpool\n");
                // thread-1 thread-2 thread-3....
                _tp = new ThreadPool();
                _tp->Init();
                _tp->Start();
                
            }
            else
            {
                LOG(INFO, "get threadpool\n");
            }
        }
        return _tp;
    }
    
// 单例模式
template <class T>
class ThreadPool
{
	//...
    static ThreadPool<T> *_tp;
    static pthread_mutex_t _sig_mtx;
    //...
};
//类外初始化
template <class T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::_sig_mtx = PTHREAD_MUTEX_INITIALIZER;

这样单例模式就完成了!

4.5 测试运行

#include "ThreadPool.hpp"
#include <iostream>
#include "Task.hpp"
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include "Log.hpp"


int main()
{
    srand(time(nullptr) ^ getpid());
    // ThreadPool<Task> *tp = new ThreadPool<Task>();
    // tp->Init();
    // tp->Start();
    ThreadPool<Task> *tp = ThreadPool<Task>::GetInstance();
    int cnt = 5;
    while (--cnt)
    {
        int num1 = rand() % 10;
        usleep(1000);
        int num2 = rand() % 10;
        Task t(num1, num2);
        tp->Equeue(t);
        LOG(INFO , "Equeue a task , %s\n" , t.debug().c_str());
        sleep(1);
    }
    tp->Stop();
    LOG(INFO , "ThreadPool stop! \n");
    return 0;
}

我们来进行测试:
【Linux】线程池项目详解-LMLPHP
很好的完成测试代码!!!

5 总结

线程的学习就告一段落,接下来我将会完成一个高并发内存池项目,来巩固C++的知识,并为简历增添一笔重要颜色!完成项目之后开启全新篇章 — 计算机网络,欢迎大家支持!!!

接下来还会持续更新算法相关内容,欢迎大家支持!!!

08-26 04:51