一.条件变量
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
创建条件变量
cond:要初始化的条件变量。
attr:NULL。
等待条件满足
唤醒等待
一个示例
我想让每一个线程依次对一个全局变量cnt做++操作。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
#include <iostream>
using namespace std;
pthread_mutex_t mutex=PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;//初始化锁
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;//初始化条件变量
int cnt=0;
void *route(void *arg)
{
char *id =(char*)arg;
while(1)
{
pthread_mutex_lock(&mutex);//上锁
pthread_cond_wait(&cond,&mutex);//为什么在这等待?因为它在让线程等待时会自动释放锁
cout<<id<<",cnt:"<<cnt++<<endl;
pthread_mutex_unlock(&mutex);//解锁
}
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
sleep(1);
cout<<"main thread begin:"<<endl;
while(1)
{
sleep(1);
pthread_cond_signal(&cond);//唤醒正在等待的一个线程,默认是第一个
//pthread_cond_broadcast(&cond);//唤醒所有线程
cout<<"signal one pthread...."<<endl;
}
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
pthread_mutex_destroy(&mutex);//销毁
return 0;
}
可以看到到此,所创建的线程均以一定顺序对cnt进行了++操作。注意这里的顺序不一定是1234,也有可能是2341…但它一定呈现周期性。
二.生产者和消费者模型
1.概念和特点
模型特点:
3种关系:消费者和消费者,生产者和生产者,生产者和消费者。
2种角色:消费者和生产者。
1个交易场所:特定的内存空间。
2.实现基于阻塞队列的生产者消费者模型
实现一个cp模型
设置水平线,当产品数量低于水平线时开始生产,当产品数量高于水平线时开始消费。
BlockQueue.hpp
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
template<class T>
class Blockqueue
{
public:
Blockqueue(int capacity=20)
:capacity_(capacity)//默认容量是20
{
pthread_mutex_init(&mutex_, nullptr);//初始化锁
pthread_cond_init(&c_cond_, nullptr);//初始化消费者条件变量
pthread_cond_init(&p_cond_, nullptr);//初始化生产者条件变量
water_=capacity_/3;//水平线
}
T pop()
{
pthread_mutex_lock(&mutex_);//上锁
if(q_.size()==0)
{
//如果没有产品,消费者开始等待
pthread_cond_wait(&c_cond_,&mutex_);
}
T out=q_.front();
q_.pop();
if(q_.size()<water_)
{
//如果数量小于水平线,唤醒生产者
pthread_cond_signal(&p_cond_);
}
pthread_mutex_unlock(&mutex_);//解锁
return out;
}
void push(const T&in)
{
pthread_mutex_lock(&mutex_);//上锁
if(q_.size()>capacity_)
{
//如果数量高于容量,生产者开始等待
pthread_cond_wait(&p_cond_,&mutex_);
}
q_.push(in);
if(q_.size()>water_)
{
//如果数量高于水平线,唤醒消费者
pthread_cond_signal(&c_cond_);
}
pthread_mutex_unlock(&mutex_);//解锁
}
~Blockqueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源
int capacity_; //容量
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
int water_;
};
main.cc
#include"Blockqueue.hpp"
void*Constumer(void*args)//消费者
{
Blockqueue<int> *bq=static_cast<Blockqueue<int>*>(args);//强转
while(1)
{
int out=bq->pop();
std::cout<<"消费了一个数据:"<<out<<std::endl;
}
}
void*Producer(void*args)//生产者
{
Blockqueue<int> *bq=static_cast<Blockqueue<int>*>(args);
int data=0;
while(1)
{
sleep(1);
bq->push(data);
std::cout<<"生产了一个数据:"<<data++<<std::endl;
}
}
int main()
{
Blockqueue<int> *dp=new Blockqueue<int>();
pthread_t t,c;
pthread_create(&t,nullptr,Constumer,(void*)dp);
pthread_create(&c,nullptr,Producer,(void*)dp);
pthread_join(c,nullptr);
pthread_join(t,nullptr);
delete dp;
return 0;
}