linux:生产者消费者模型-LMLPHP


前言

本文是对于生产者消费者模型的知识总结


一、生产者消费者模型

生产者消费者模型就是通过一个容器来解决生产者消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过之间的容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接交给容器,消费者不找生产者要数据,而是直接从容器中取数据,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,其中这个容器就是用于生产者和消费者之间解耦的
(强耦合是指两个或多个系统,组件或模块之间存在紧密依赖关系。)

linux:生产者消费者模型-LMLPHP


特点:

  1. 三种关系:生产者与生产者之间(互斥),消费者与消费者之间(互斥),生产者与消费者之间(互斥 && 同步)
  2. 两种角色:生产者和消费者
  3. 一个交易(通讯)场所:一个容器(一段内存空间)

因为我们是多个线程访问同一个容器,那必然会导致数据不一致的问题,所以我们需要对该临界资源加锁,所以生产者与生产者之间,消费者与消费者之间,生产者与消费者之间都是互斥的。
又因为容器可能为空(满),此时消费者(生产者)还一直在临界区申请锁,又因没有数据(空间)而释放锁,从而不断申请锁释放锁,导致生产者(消费者)的饥饿问题。此时我们就需要生产者与消费者之间的同步。

对于2,3两点,这很好理解不解释。
我们编写生产者消费者模型的本质就是对以上三点的维护。(互斥保证数据安全,同步保证效率)


优点:

  1. 解耦
  2. 支持并发
  3. 支持忙闲不均

对于第一点,不就是生产者与消费者通过容器来解耦提升效率(如果没有这个容器,则生产者生产完数据,就必须等待消费者来接受处理数据,不能立刻继续生产数据)。
对于第二点,当生产者在生产数据时,消费者也同时在处理数据
对于第三点,当生产者生产数据的速度超过消费者的处理能力时,容器可以起到缓存的作用,将多余的数据暂时存储,等待消费者有空闲时再进行处理。如果消费者处理数据的能力超过生产者时,同理。

二、基于阻塞队列的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

  • 队列为空时,从队列中取数据将被阻塞,直到队列中有数据时被唤醒。
  • 队列为满时,向队列中放入数据将被阻塞,直到队列中有数据取出被唤醒。

代码实现

下面是一个单生成单消费模型
linux:生产者消费者模型-LMLPHP
LockGuard.hpp 文件 将加锁释放锁,交给一个对象处理,当对象创建加锁,对象销毁释放锁

#pragma once
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *mutex):_mutex(mutex)
    {}

    void Lock()
    {
        pthread_mutex_lock(_mutex);
    }

    void UnLock()
    {
        pthread_mutex_unlock(_mutex);
    }

    ~Mutex()
    {}
private:
    pthread_mutex_t *_mutex;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex): _lock(mutex)
    {
        _lock.Lock();
    }

    ~LockGuard()
    {
        _lock.UnLock();
    }
private:
    Mutex _lock;
};

Blockqueue.hpp 文件

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "LockGuard.hpp"

using namespace std;
const int CAPACITY = 5;

template<class T>
class BlockQueue
{
public:
    BlockQueue(int cap = CAPACITY):_capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p, nullptr);
        pthread_cond_init(&_c, nullptr);
    }

    bool isFull()
    {
        return _bq.size() == _capacity;
    }

    void Push(const T &in)
    {
        LockGuard mutex(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(isFull())
        {
            pthread_cond_wait(&_p, &_mutex);
        }

        _bq.push(in);
        // 唤醒策略为 生产一个,消费一个
        pthread_cond_signal(&_c);

        //pthread_mutex_unlock(&_mutex);
    }

    bool isEmpty()
    {
        return _bq.size() == 0;
    }

    void Pop(T *out)
    {
        LockGuard mutex(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(isEmpty())
        {
            pthread_cond_wait(&_c, &_mutex);
        }

        *out = _bq.front();
        _bq.pop();
        // 唤醒策略为 消费一个,生产一个
        pthread_cond_signal(&_p);

        //pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p);
        pthread_cond_destroy(&_c);
    }
private:
    queue<T> _bq;
    int _capacity;

    pthread_mutex_t _mutex;
    pthread_cond_t _p;
    pthread_cond_t _c;
};

Task.hpp 文件

#pragma once
#include <string>

const char *opers = "+-*/%";

enum
{
    ok = 0,
    div_zero,
    mod_zero
};

class Task
{
public:
    Task()
    {}

    Task(int x, int y, char op) : _data_x(x), _data_y(y), _oper(op)
    {
        _code = ok;
    }

    void Run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data_x + _data_y;
            break;
        case '-':
            _result = _data_x - _data_y;
            break;
        case '*':
            _result = _data_x * _data_y;
            break;
        case '/':
            {
                if(_data_y == 0)
                {
                    _code = div_zero;
                }
                else
                {
                    _result = _data_x / _data_y;
                }
            }
            break;
        case '%':
            {
                if(_data_y == 0)
                {
                    _code = mod_zero;
                }
                else
                {
                    _result = _data_x % _data_y;
                }
            }
            break;
        default:
            break;
        }
    }

    void operator()()
    {
        Run();
    }

    std::string PrintTask()
    {
        std::string ret = std::to_string(_data_x);
        ret += _oper;
        ret += std::to_string(_data_y);

        ret += "=?";
        return ret;
    }

    std::string PrintResult()
    {
        std::string ret = std::to_string(_data_x);
        ret += _oper;
        ret += std::to_string(_data_y);

        ret += "=";
        if(_code == ok)
        {
            ret += std::to_string(_result);
        }
        else
        {
            ret += "?";
        }

        ret += "[";
        ret += std::to_string(_code);
        ret += "]";

        return ret;
    }

    ~Task()
    {}

private:
    int _data_x;
    int _data_y;
    char _oper;

    int _result;
    int _code; // 错误码
};

Main.cc 文件

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;


void *producer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    // 产生任务
    while (true)
    {
        int x = rand() % 10 + 1;
        int y = rand() % 10 + 1;
        char oper = opers[rand() % strlen(opers)];

        Task task(x, y, oper);
        cout << "producer: " << task.PrintTask() << endl;
        bq->Push(task);
        sleep(1);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // usleep(1000);
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    // 获取任务,处理任务
    while (true)
    {
        if (bq->isFull())
        {
            Task task;
            bq->Pop(&task);

            task();
            cout << "consumer: " << task.PrintResult() << endl;
            //sleep(1);
        }
    }
}

int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t p;
    pthread_create(&p, nullptr, producer, (void *)&bq);

    pthread_t c;
    pthread_create(&c, nullptr, consumer, (void *)&bq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);
    return 0;
}

linux:生产者消费者模型-LMLPHP
linux:生产者消费者模型-LMLPHP

那如何将这个单生产单消费该为多生产多消费呢?因为多生产多消费本质也是多个线程访问临界资源,那我们单生产和单消费不也是多个线程访问临界资源吗,所以我们不需要对BlockQueue.hpp文件进行修改,只需要在main函数中,创建多个生产者和消费者即可。

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include <string.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
using namespace std;

template <class T>
class ThreadData
{
public:
    ThreadData(pthread_t tid, const string threadname, BlockQueue<T> *bq)
        : _tid(tid), _threadname(threadname), _bq(bq)
    {}

public:
    pthread_t _tid;
    string _threadname;
    BlockQueue<T>* _bq;
};

void *producer(void *args)
{
    ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
    // 产生任务
    while (true)
    {
        int x = rand() % 10 + 1;
        int y = rand() % 10 + 1;
        char oper = opers[rand() % strlen(opers)];

        Task task(x, y, oper);
        cout << data->_tid << ", " << data->_threadname <<": " << task.PrintTask() << endl;
        data->_bq->Push(task);
        sleep(1);
    }

    return nullptr;
}

void *consumer(void *args)
{
    // usleep(1000);
    ThreadData<Task> *data = static_cast<ThreadData<Task> *>(args);
    // 获取任务,处理任务
    while (true)
    {
        if (data->_bq->isFull())
        {
            Task task;
            data->_bq->Pop(&task);

            task();
            cout << data->_tid << ", " << data->_threadname << ": " << task.PrintResult() << endl;
            // sleep(1);
        }
    }
}



int main()
{
    srand(time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t p1;
    ThreadData<Task> data1(p1, "product-1", &bq);
    pthread_create(&p1, nullptr, producer, (void *)&data1);

    pthread_t p2;
    ThreadData<Task> data2(p2, "product-2", &bq);
    pthread_create(&p2, nullptr, producer, (void *)&data2);

    pthread_t c1;
    ThreadData<Task> data3(c1, "consumer-1", &bq);
    pthread_create(&c1, nullptr, consumer, (void *)&data3);
    pthread_t c2;
    ThreadData<Task> data4(c2, "consumer-2", &bq);
    pthread_create(&c2, nullptr, consumer, (void *)&data4);

    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);

    return 0;
}

linux:生产者消费者模型-LMLPHP
linux:生产者消费者模型-LMLPHP


总结

以上就是我对于线程同步的总结。

linux:生产者消费者模型-LMLPHP

04-02 00:32