引言

生產者消費者是一個經典的模式

利用生產者,消費者和緩衝區降低了生產者和消費者之間的的耦合度

便於對生產者和消費者的修改

下面記錄的是一個經典的單一生產者多消費者的模式

設計思路

以隊列做為緩衝區,實現產品的FIFO

生產者調用緩衝區的push函數,將產品加入緩衝區

消費者調用緩衝區的pop函數,將產品從緩衝區取出

因為生產者與消費者分屬於不同的線程,所以要設置鎖

類的聲明

class CacheQueue
{
private:
/**
* @brief 緩衝隊列
*/
queue<int>* _requests; /**
* @brief 互斥鎖
**/
pthread_mutex_t _mutex; /**
* @brief Queue not full conditional object
**/
pthread_cond_t _not_full_cond; /**
* @brief Queue not empty conditional object
**/
pthread_cond_t _not_empty_cond; uint32_t _bufSize; public: ChacheQueue(); void SetMaxLength(uint32_t bufSize);
/**
* @brief 向隊列添加產品
* @param [in] req: 待添加的產品
**/
void Push(int req); /**
* @brief 從隊列中取出一個產品
* @param [return] : 從隊列中取出的產品
**/
int Pop(uint32_t timeout); /**
* @brief 析構函數
**/
~CacheQueue();
};

重要的函數是Push和Pop,生產者調用Push向緩衝區添加產品,消費者則調用Pop函數獲取產品

線程條件_not_full_cond表示隊列不滿,可以添加產品

線程條件_not_empty_cond表示隊列不空,可以獲取產品

Push函數

void CacheQueue::Push(int req)
{
/**
* 上鎖
*/
pthread_mutex_lock(&_mutex); /**
* 如果隊列滿,等待信號
*/
while (_requests->size() == _bufSize)
{
pthread_cond_wait(&_not_full_cond, &_mutex);
}
_requests->push(req); /**
* 發送非空信號
*/
pthread_cond_signal(&_not_empty_cond); /**
* 解鎖
*/
pthread_mutex_unlock(&_mutex);
}

Pop函數

int CacheQueue::Pop(uint32_t timeout)
{
int ret = 0;
int req = NO_DATA;
/**
* 上鎖
*/
pthread_mutex_lock(&_mutex);
/**
* 若隊列空等待指定時間
*/
struct timeval now;
struct timespec timepass;
gettimeofday(&now, NULL);
timepass.tv_sec = now.tv_sec + timeout;
timepass.tv_nsec = 0;
while (ret == 0 && _requests->empty())
{
ret = pthread_cond_timedwait(&_not_empty_cond, &_mutex, &timepass);
}
/**
* 沒有數據,返回沒有數據標識
*/
if(ret!=0)
{
pthread_mutex_unlock(&_mutex);
return req;
}
/**
* 返回數據,發送隊列非滿信號
*/
req = _requests->front();
_requests->pop();
pthread_cond_signal(&_not_full_cond);
/**
* 解鎖
*/
pthread_mutex_unlock(&_mutex);
return req;
}
04-15 16:47