关于生产者消费者模型

生产者消费者问题是一个经典的问题,用于多进程同步,即各个线程之间的同步。在生产者消费者问题中,有一个生产者负责生产某种物品,还有一个消费者负责消费生产者生产的产品。生产者和消费者共享一个固定大小的内存缓冲区。生产者的工作是生产数据,将其放入缓冲区,然后再次开始生成数据。而消费者的工作是从缓冲区中消费数据。

生产者和消费者之间的关系可以简单的描述为:

  • 生产者生产数据或资源,并将其放入缓冲区。
  • 消费者从缓冲区中取出资源,并进行相应的处理。

为什么要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产数据之后不用等待消费者处理,直接放入阻塞队列中,消费者也不必找生产者要数据,而是直接通过阻塞来取数据,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型-LMLPHP

在上述对生产者和消费者模型的描述中,该模型中可能会出现的问题:

  • 当缓冲区为满时,生产者不应该被允许向缓冲区中放入任何数据。
  • 当缓冲区为空时,消费者不应该被允许从缓冲区中读取任何数据。
  • 生产者和消费者不应该同时访问缓冲区。

解决上述问题的一种常见方法就是使用互斥锁和条件变量。生产者在向缓冲区中放入数据之前,需要获取互斥锁,并检查缓冲区是否已满。如果缓冲区已满,生产者会等待条件变量,直到有空间可用。消费者在从缓冲区中取出数据之前,也需要获取互斥锁,并检查缓冲区是否为空。如果缓冲区为空,消费者会等待条件变量,直到有数据可用。

生产者消费者模型的优点:

解耦 :生产者和消费者是独立的实体,彼此之间通过缓冲区进行交互。这种解耦使生产者和消费者可以独自进行操作,而不需要依赖对方的操作或状态。提高了代码的可维护性。

支持并发 :生产者消费者模型能够支持多个生产者和多个消费者并发运行。这种并发性能够提高系统的吞吐量,提高整体性能。

支持忙闲不均:生产者和消费者能够有效处理它们之间的忙闲不均。当生产者的生产速度超过消费者的消费速度时,多余的数据或资源可以暂时存储在缓冲区中,以供消费者以后使用。而当消费者的消费速度超过生产者的生产速度时,缓冲区可以暂时保存消费者暂未消费的数据或资源。这种能力使得系统能够平衡生产者和消费者之间的速度差异,提高了系统的稳定性和性能。

生产者消费者模型的特点

生产者消费者模型是一个经典的多线程同步与互斥的场景。其特点如下:

:生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(同步互斥关系)。

:生产者和消费者。

:通常为内存中的一段缓冲区,用于存储生产者生产的数据和消费者消耗的数据。

当我们编写生产者消费者模型时,需要对以上三点进行维护。

🎯 生产者和生产者、消费者和消费者,生产者和消费者之间存在互斥关系的原因就是因为它们都需要访问共享的临界资源,即缓冲区。因多个执行流同时访问缓冲区资源可能导致数据竞争和不一致等问题,为了保证数据的正确性,需要使用互斥锁对临界区进行保护。

🎯生产者和消费者之间存在同步关系是为了确保生产者和消费者在合适的时间进行操作,避免不必要的等待和资源浪费。为确保生产者和消费者能够顺序操作,需要满足以下两个条件:

  • 生产者需要在缓冲区有多余空间时进行生产数据。
  • 消费者需要在缓冲区非空时进行消费数据。

通过同步关系,生产者和消费者可以在合适时间进行操作,避免生产者生产的数据丢失或覆盖,以及消费者的错误操作问题。
同步关系可以通过条件变量、信号量等机制实现。

基于BlockingQueue的生产者消费者模型

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列为满时,往队列中存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

生产者消费者模型-LMLPHP

总的来说,阻塞队列在以下场景中发挥重要作用:

  • 生产者和消费者的速度不同:阻塞队列可以协调生产者和消费者之间的速度差异。
  • 线程之间的协作和通信:阻塞队列提供了一种简单有效的方式,让生产者线程和消费者线程在无需手动实现锁和条件变量的情况下进行协作和通信。
  • 资源控制和线程调度:阻塞队列可以用于资源控制和访问顺序。

下面我们使用STL库中的 queue 来对实现一个单生产者单消费者的模型:

#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <cstdlib>
#include <pthread.h>
using namespace std;

const uint32_t gDefaultCap = 5; // 阻塞队列默认容量
template <class T>
class BlockQueue
{
public:
    BlockQueue(uint32_t cap = gDefaultCap) : cap_(cap)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&consumerCond_, nullptr);
        pthread_cond_init(&producerCond_, nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&consumerCond_);
        pthread_cond_destroy(&producerCond_);
    }

    // 生产者接口
    void push(const T &in) // const &:纯输入
    {
        lockQueue();
        while (isFull()) // isFull就是我们在临界区设定的条件
        {
            // before:当等待的时候,会自动释放mutex_
            proBlockWait(); // 阻塞等待,等待被唤醒。被唤醒!=条件被满足
            // after:当被唤醒时,是在临界区中醒来的
        }
        // 条件满足,可以生产数据
        pushCore(in);
        unlockQueue();
        wakeupCon(); // 唤醒消费者
    }

    // 消费者接口
    T pop()
    {
        lockQueue();
        while (isEmpty())
        {
            conBlockWait();
        }
        // 条件满足,可以消费数据
        T tmp = popCore();
        unlockQueue();
        wakeupPro(); // 唤醒生产者
        return tmp;
    }

private:
    void lockQueue() { pthread_mutex_lock(&mutex_); }
    void unlockQueue() { pthread_mutex_unlock(&mutex_); }
    bool isEmpty() { return bq_.empty(); }
    bool isFull() { return bq_.size() == cap_; }

    // 生产者一定是在临界区中的;
    // 1.在阻塞线程的时候,会自动释放mutex_锁
    // 2.当阻塞结束返回的时候,pthread_cond_wait会自动重新获得mutex_,然后才返回
    void proBlockWait() { pthread_cond_wait(&producerCond_, &mutex_); }
    void conBlockWait() { pthread_cond_wait(&consumerCond_, &mutex_); }

    void wakeupCon() { pthread_cond_signal(&consumerCond_); }
    void wakeupPro() { pthread_cond_signal(&producerCond_); }
    void pushCore(const T &in) { bq_.push(in); }
    T popCore()
    {
        T tmp = bq_.front();
        bq_.pop();
        return tmp;
    }

private:
    uint32_t cap_;                 // 容量
    queue<T> bq_;                  // blockqueue
    pthread_mutex_t mutex_;        // 保护阻塞队列的互斥锁
    pthread_cond_t consumerCond_;  // 让消费者等待的条件变量
    pthread_cond_t producerCond_; // 让生产者等待的条件变量
};

说明:

  • 上述使用互斥锁来保护阻塞队列这个临界资源。互斥锁确保了在同一时刻只有一个线程能够访问阻塞队列。当一个线程获得了互斥锁之后,其它线程就必须等待该线程释放互斥锁之后才能够再次访问。
  • 在生产者线程向阻塞队列中添加数据时,如果阻塞队列已经满了,那么该生产者线程就需要进入等待状态,直到阻塞队列中有空间时再将其唤醒。为了实现该功能,使用了一个条件变量来描述阻塞队列是否已经满了,当阻塞队列满了时,生产者线程就需要在对应的条件变量下等待。等待时,该线程会自动释放手中的锁。
  • 调用 pthread_cond_wait 函数时,需要传入线程手中的互斥锁。这是为了避免死锁问题。如果在等待的过程中不释放互斥锁,那么其它的线程就无法获得互斥锁,也就无法访问阻塞队列。
  • 当生产者线程成功向阻塞队列中添加了一个数据时,就需要唤醒在 consumerCond_ 条件变量下等待的消费者线程。这是因为此时阻塞队列中已经有了至少一个数据可以被消费了。同样的,当消费者线程成功从阻塞队列中取出了一个数据时,就需要唤醒在 productorCond_ 条件变量下等待的生产者线程。这是因为此时阻塞队列中已经有了至少一个空间可以被生产者线程使用了。

当判断生产者何消费者条件是否满足是应该用 while,而不是 if 。为什么?

  1. 条件变量的假唤醒:在某些情况下,即使没有明确的调用 pthread_cond_signal() 或 pthread_cond_broadcast(),pthread_cond_wait() 也可能会返回。者被称为假唤醒。若使用 if 语句进行判断,那么在假唤醒的情况下,线程可能也会继续执行,尽管条件并没有满足。使用 while 则可以在每次唤醒时重新检测条件,从而避免这种情况的发生。
  2. 防止竞态条件:在多线程环境下,可能会出现多个线程同时被唤醒并试图操作共享数据的情况。使用 while 循环可以在每次被唤醒时检测条件。

对上述的单生产者单消费者模型进行测试,让生产者生产数据,消费者消费生产者生成的数据:

#include "blockqueue.hpp"
#include <ctime>

void *consumer(void *args)
{
    BlockQueue<int> *bqp = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        int data = bqp->pop();
        cout << "consumer 消费了一个数据:" << data << endl;
    }
}

void *producer(void *args)
{
    BlockQueue<int> *bqp = static_cast<BlockQueue<int> *>(args);
    while (true)
    {
        // 1.制作数据
        int data = rand() % 10;
        // 2.生产数据
        bqp->push(data);
        cout << "producer 生产数据完成:" << data << endl;
        sleep(1);
    }
}

int main()
{
    BlockQueue<int> bq;
    bq.push(7);
    cout << bq.pop() << endl;
    return 0;
}

测试结果如下:

生产者消费者模型-LMLPHP

基于计算型任务的生产者消费者模型

在实现上述的生产者消费者模型时,我们将其中存入队列的数据进行了模板化,因此可以在 block queue 中放入其它类型的数据或者特定任务。

下面实现一个基于简单计算任务的生产者消费者模型,在这里我们定义一个任务类(Task),该类用于处理用户给出的运算任务:

#pragma once

#include <iostream>
#include <string>

class Task
{
public:
    Task(int one = 0, int two = 0, char op = '0')
        : elemOne_(one), elemTwo_(two), operator_(op)
    {
    }

    int operator()() { return run(); }

    int run()
    {
        int result = 0;
        switch (operator_)
        {
        case '+':
            result = elemOne_ + elemTwo_;
            break;
        case '-':
            result = elemOne_ - elemTwo_;
            break;
        case '*':
            result = elemOne_ * elemTwo_;
            break;
        case '/':
            if (elemTwo_ == 0)
            {
                std::cout << "div zero,about " << std::endl;
                result = -1;
            }
            else
                result = elemOne_ / elemTwo_;
            break;
        case '%':
            if (elemTwo_ == 0)
            {
                std::cout << "mod zero,about " << std::endl;
                result = -1;
            }
            else
                result = elemOne_ % elemTwo_;
            break;
        default:
            std::cout << "非法操作:" << operator_ << std::endl;
            break;
        }
        return result;
    }

    int get(int *e1, int *e2, char *op)
    {
        *e1 = elemOne_;
        *e2 = elemTwo_;
        *op = operator_;
    }

private:
    int elemOne_;
    int elemTwo_;
    char operator_;
};

将 Task 对象放入生产者消费者模型中的队列,消费者从中拿取任务并进行相应的计算,如下:

#include "blockqueue.hpp"
#include "task.hpp"
#include <ctime>

const std::string ops = "+-*/%";
void *producer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        // 1.制作任务
        int one = rand() % 50;
        int two = rand() % 50;
        char op = ops[rand() % ops.size()];
        Task t(one, two, op);
        // 2.生产数据
        bq->push(t);
        cout << "producer task : " << one << " " << op << " " << two << " = ?" << endl;
        sleep(1);
    }
}

void *consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        Task t = bq->pop();
        int result = t(); // 调用仿函数处理任务
        int one, two;
        char op;
        t.get(&one, &two, &op);
        cout << "consumer calculate result : " << one << " " << op << " " << two << " = " << result << endl;
    }
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<Task> bq;

    pthread_t c, p;
    pthread_create(&p, nullptr, producer, &bq);
    pthread_create(&c, nullptr, consumer, &bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

运行代码结果如下:

生产者消费者模型-LMLPHP

POSIX信号量

POSIX 信号量和 System V 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。然而,POSIX 信号量还可以用于线程间的同步,而 System V 信号量主要用于进程间的同步。

信号量的 PV 操作:

  • P 操作(申请操作)是对信号量进行申请或获取的操作。它的目的是申请获得临界资源的使用权限。在信号量的计数器上执行 P 操作时,计数器的值会 -1,表示使用资源的数量减少。
  • V 操作(释放操作)是对信号量进行释放的操作。它的目的是归还临界资源的使用权限。在信号量的计数器上执行 V 操作时,计数器的值会 +1,表示资源使用数量的增加。

通过使用 P 操作和 V 操作,可以实现对临界资源的控制和同步。当一个进程或线程需要访问临界资源时,它会执行 P 操作来申请资源的使用权限。若资源可用,P 操作成功,计数器减少,进程或线程可以继续访问临界资源。若资源不可以,P 操作会阻塞进程或线程,直到资源可用为止。

当进程或线程完成对临界资源的访问时,它会执行 V 操作来释放资源的使用权限。V 操作会增加计数器的值,使得其它等待的进程或线程可以继续申请资源的使用权限。

🎯由于信号量的 PV 操作中涉及到对信号量值的额修改,如果这些操作不是原子的,可能会导致多个进程或线程同时修改信号量的值,从而引发数据不一致的问题。而信号量本身也是临界资源,因此 PV 操作必须是原子操作。

初始化信号量

sem_init() 函数用于初始化信号量。它的函数原型如下:

#include<semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
  • 其中 sem 参数是执行一个 sem_t 类型的信号变量的指针。
  • pshared 参数用于指定信号量在进程间共享还是线程间共享(0表示线程间共享,非零表示进程间共享)。
  • value 参数用于指定信号量的初始值。

如果函数调用成功则返回 0,错误则返回 -1。

销毁信号量

sem_destroy() 函数用于销毁一个已经初始化过的信号量。它的函数原型如下:

int sem_destroy(sem_t *sem);
  • sem:信号量变量的指针,用于指向需要销毁的信号量。

函数调用成功则返回 0,失败则返回 -1。

注意:只有在所有的线程或进程都不需要使用该信号量时,才应该调用该函数来销毁信号量。否则可能会出现未定义的行为。

等待信号量

sem_wait() 函数用于等待一个信号量,会将信号量的值减 1。它的函数原型如下:

int sem_wait(sem_t *sem);  // P()

发布信号量

sem_post() 用于发布一个信号量,表示资源使用完毕,可以归还资源了。将信号量的值加1。它的函数原型如下:

int sem_post(sem_t *sem); // V()

sem_post() 函数会将信号量的值加 1,如果有其它线程或进程正在等待该信号量,则会唤醒其中的一个线程或进程。

基于RingQueue的生产者消费者模型

环形队列采用数组模拟,用模运算来模拟环状特性:
生产者消费者模型-LMLPHP

环形结构起始状态和结束状态都是不一样的,不好判断为空或者为满,所以可以通过加计数器或标记为来判断满或者空。另外也可以预留一个空的位置,作为满的状态。但是我们现在有信号量这个计数器,就可以很简单的进行多线程间的同步过程。

以下是基于环形队列实现的生产者消费者模型:

#pragma once
#include <iostream>
#include <string>
#include <semaphore.h>
#include <vector>

using namespace std;

const int gCapacity = 5;

template <class T>
class RingQueue
{
public:
    RingQueue(int cap = gCapacity) : ringqueue_(cap), pIndex_(0), cIndex_(0)
    {
        // 生产者信号量初始值为队列容量
        sem_init(&roomSem_, 0, ringqueue_.size());
        // 消费者信号量初始值为0
        sem_init(&dataSem_, 0, 0);

        pthread_mutex_init(&pMutex_, nullptr);
        pthread_mutex_init(&cIndex_, nullptr);
    }

    ~RingQueue()
    {
        sem_destroy(&roomSem_);
        sem_destroy(&dataSem_);

        pthread_mutex_destroy(&pMutex_);
        pthread_mutex_destroy(&cMutex_);
    }

    void push(const T &in)
    {
        // 申请一个空间,如果没有空间则等待
        sem_wait(&roomSem_);
        pthread_mutex_lock(&pMutex_);

        // 写入数据
        ringqueue_[pIndex_] = in;
        pIndex_++;
        pIndex_ %= ringqueue_.size();

        pthread_mutex_unlock(&pMutex_);
        // 通知消费者有数据可以读取
        sem_post(&dataSem_);
    }

    T pop()
    {
        // 申请一个数据,若没有数据则等待
        sem_wait(&dataSem_);
        pthread_mutex_lock(&cMutex_);

        // 读取数据
        T temp = ringqueue_[cIndex_];
        cIndex_++;
        cIndex_ %= ringqueue_.size();

        pthread_mutex_unlock(&cMutex_);
        // 通知生产者有空间可写
        sem_post(&roomSem_);
        return temp;
    }

private:
    vector<T> ringqueue_;    // 环形队列
    sem_t roomSem_;          // 衡量空间计数器,producer,用于控制生产者
    sem_t dataSem_;          // 衡量数据计数器,consumer,用于控制消费者
    uint32_t pIndex_;        // 当前生产者写入的位置,如果是多线程,pIndex_也是临界资源
    uint32_t cIndex_;        // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
    pthread_mutex_t pMutex_; // 生产者的互斥锁
    pthread_mutex_t cMutex_; // 消费者的互斥锁
};

说明:

  • 使用 vector 容器来实现环形队列,通过指定的队列容量和两个标记 pIndex_ 和 cIndex_ 来控制环形队列的读写操作。
  • 在环形队列中使用信号量来控制生产者和消费者的行为。roomSem_ 用于控制生产者的行为,它的初始值为队列容量,表示缓冲区中有多少个空闲的位置可以写入数据。dataSem_用于控制消费者的行为,它的初始值为 0,表示缓冲区中没有数据可以读取。
  • 使用互斥锁来确保线程的安全。
  • 生产者使用 push 函数向缓冲区中写入数据,在写入数据之前,生产者需要申请一个空闲位置,如果没有空闲位置则等待。消费者使用 pop 函数从缓冲区中读取数据,在读取数据之前,消费者需要申请一个可读数据,如果没有可读数据则等待。
  • 通过互斥量和信号量的配合,实现了生产者和消费者的协同和同步。

接下来使用下列代码对生产者消费者模型进行测试:

#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>

void *producer(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int data = rand() % 10;
        rqp->push(data);
        cout << "pthread[" << pthread_self() << "] 生产了一个数据:" << data << endl;
        sleep(1);
    }
}

void *consumer(void *args)
{
    RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        int data = rqp->pop();
        cout << "pthread[" << pthread_self() << "] 消费了一个数据:" << data << endl;
    }
}

int main()
{
    srand((unsigned long)time(nullptr) ^ getpid());

    RingQueue<int> rq;

    pthread_t c1, c2, c3, p1, p2, p3;
    pthread_create(&p1, nullptr, producer, &rq);
    pthread_create(&p2, nullptr, producer, &rq);
    pthread_create(&p3, nullptr, producer, &rq);
    pthread_create(&c1, nullptr, consumer, &rq);
    pthread_create(&c2, nullptr, consumer, &rq);
    pthread_create(&c3, nullptr, consumer, &rq);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);

    return 0;
}

测试结果如下:

生产者消费者模型-LMLPHP

11-16 08:43