body, table{font-family: 微软雅黑; font-size: 10pt}
table{border-collapse: collapse; border: solid gray; border-width: 2px 0 2px 0;}
th{border: 1px solid gray; padding: 4px; background-color: #DDD;}
td{border: 1px solid gray; padding: 4px;}
tr:nth-child(2n){background-color: #f8f8f8;}
基于对象:
Noncopyable.h | MutexLock.h |
#ifndef __NONCOPYABLE_H__ #define __NONCOPYABLE_H__ #include<iostream> using namespace std; namespace meihao { class Noncopyable { protected: Noncopyable(){} ~Noncopyable(){} private: Noncopyable(const Noncopyable&); Noncopyable& operator=(const Noncopyable&); }; }; #endif | #ifndef __MUTEXLOCK_H__ #define __MUTEXLOCK_H__ #include<iostream> #include"Noncopyable.h" #include<pthread.h> using namespace std; namespace meihao { class MutexLock:private Noncopyable { public: MutexLock(); ~MutexLock(); void lock(); void unlock(); pthread_mutex_t* getMutexPtr(); private: pthread_mutex_t _mutex; }; }; #endif |
Condition.h | Buffer.h |
#ifndef __CONDITION_H__ #define __CONDITION_H__ #include<iostream> #include"Noncopyable.h" #include<pthread.h> using namespace std; namespace meihao { class MutexLock; class Condition:private Noncopyable { public: Condition(MutexLock& mutex):_mutex(mutex) { pthread_cond_init(&_cond,NULL); } ~Condition(); void wait(); void notify(); void notifyall(); private: pthread_cond_t _cond; MutexLock& _mutex; }; }; #endif | #ifndef __BUFFER_H__ #define __BUFFER_H__ #include<iostream> #include"MutexLock.h" #include"Condition.h" #include<queue> using namespace std; namespace meihao { typedef int DataType; class Buffer { public: Buffer(int); ~Buffer(); void push(DataType); DataType pop(); bool full(); bool empty(); private: MutexLock _mutex; Condition _notFull; Condition _notEmpty; queue<DataType> _que; int _size; }; }; #endif |
thread.h | MutexLock.cpp |
#ifndef __THREAD_H__ #define __THREAD_H__ #include<iostream> #include<pthread.h> #include<functional> #include"Noncopyable.h" using namespace std; namespace meihao { typedef std::function<void()> ThreadCallback; class Thread:private Noncopyable { public: Thread(ThreadCallback cb):_pthId(0),_isRunning(false),_cb(cb){} ~Thread(); void start(); void join(); static void* threadFunc(void*); private: pthread_t _pthId; bool _isRunning; ThreadCallback _cb; }; }; #endif | #include<iostream> #include"MutexLock.h" using namespace std; namespace meihao { MutexLock::MutexLock() { pthread_mutex_init(&_mutex,NULL); } MutexLock::~MutexLock() { pthread_mutex_destroy(&_mutex); } void MutexLock::lock() { pthread_mutex_lock(&_mutex); } void MutexLock::unlock() { pthread_mutex_unlock(&_mutex); } pthread_mutex_t* MutexLock::getMutexPtr() { return &_mutex; } }; |
Condition.cpp | Buffer.cpp |
#include<iostream> #include"Condition.h" #include"MutexLock.h" using namespace std; namespace meihao { Condition::~Condition() { pthread_cond_destroy(&_cond); } void Condition::wait() { pthread_cond_wait(&_cond,_mutex.getMutexPtr()); } void Condition::notify() { pthread_cond_signal(&_cond); } void Condition::notifyall() { pthread_cond_broadcast(&_cond); } }; | #include<iostream> #include"Buffer.h" using namespace std; namespace meihao { Buffer::Buffer(int size):_mutex(),_notFull(_mutex),_notEmpty(_mutex),_size(size){} Buffer::~Buffer(){} bool Buffer::full() { return _size == _que.size(); } bool Buffer::empty() { return _que.size() == 0; // 这个地方脑子坏了,写成_size == 0; 导致后面pop等待作废 } void Buffer::push(DataType value) { _mutex.lock(); if(full()) { _notFull.wait(); // 等待条件变量,队列没有满,唤醒 } _que.push(value); cout<<"prodece a num "<<value<<endl; _notEmpty.notify(); // 有一个元素进队列,_notEmpty的条件变量满足 _mutex.unlock(); } DataType Buffer::pop() { _mutex.lock(); if(empty()) { _notEmpty.wait(); // 等待不空的条件变量_notEmpty满足 } DataType tmp = _que.front(); _que.pop(); cout<<"consumer a num "<<tmp<<endl; _notFull.notify(); _mutex.unlock(); return tmp; } }; |
thread.cpp | test.cpp |
#include<iostream> #include"Thread.h" using namespace std; namespace meihao { Thread::~Thread() { pthread_detach(_pthId); } void Thread::start() { pthread_create(&_pthId,NULL,&Thread::threadFunc,this); // 调用成员函数的this指针也要传递过去 _isRunning = true; } void Thread::join() { if(_isRunning) { pthread_join(_pthId,NULL); _isRunning = false; } } void* Thread::threadFunc(void* arg) { Thread* pthread = static_cast<Thread*> (arg); if(NULL!=pthread) { pthread->_cb(); } } }; | #include<iostream> #include"Buffer.h" #include<time.h> #include<unistd.h> #include"Thread.h" using namespace std; struct Producer { void produce(meihao::Buffer& buffer) { ::srand(time(NULL)); int i = 0; while(i<5) { int num = rand()%100; buffer.push(num); //cout<<"prodece a num "<<num<<endl; // 放在这里可能导致不一致了 sleep(1); i++; } } }; struct Consumer { void consumer(meihao::Buffer& buffer) { int i = 0; while(i<5) { int num = buffer.pop(); //cout<<"consumer a num "<<num<<endl; sleep(2); i++; } } }; int main() { Producer p1; Consumer c1; meihao::Buffer buffer(5); meihao::Thread produceThread(bind(&Producer::produce,&p1,ref(buffer))); //生产者线程 meihao::Thread consumThread(bind(&Consumer::consumer,&c1,ref(buffer))); //消费者线程 produceThread.start(); consumThread.start(); produceThread.join(); consumThread.join(); return 0; } |
面向对象:
Nocopyable.h | MutexLock.h |
#ifndef __NONCOPYABLE_H__ #define __NONCOPYABLE_H__ #include<iostream> using namespace std; namespace meihao { class Noncopyable { protected: Noncopyable(){} ~Noncopyable(){} private: Noncopyable(const Noncopyable&); Noncopyable& operator=(const Noncopyable&); }; }; #endif | #ifndef __MUTEXLOCK_H__ #define __MUTEXLOCK_H__ #include<iostream> #include"Noncopyable.h" #include<pthread.h> using namespace std; namespace meihao { class MutexLock:private Noncopyable { public: MutexLock(); ~MutexLock(); void lock(); void unlock(); pthread_mutex_t* getMutexPtr(); private: pthread_mutex_t _mutex; }; }; #endif |
Condition.h | Buffer.h |
#ifndef __CONDITION_H__ #define __CONDITION_H__ #include<iostream> #include"Noncopyable.h" #include<pthread.h> using namespace std; namespace meihao { class MutexLock; class Condition:private Noncopyable { public: Condition(MutexLock& mutex):_mutex(mutex) { pthread_cond_init(&_cond,NULL); } ~Condition(); void wait(); void notify(); void notifyall(); private: pthread_cond_t _cond; MutexLock& _mutex; }; }; #endif | #ifndef __BUFFER_H__ #define __BUFFER_H__ #include<iostream> #include"MutexLock.h" #include"Condition.h" #include<queue> using namespace std; namespace meihao { typedef int DataType; class Buffer { public: Buffer(int); ~Buffer(); void push(DataType); DataType pop(); bool full(); bool empty(); private: MutexLock _mutex; Condition _notFull; Condition _notEmpty; queue<DataType> _que; int _size; }; }; #endif |
Thread.h | ProduceThread.h |
#ifndef __THREAD_H__ #define __THREAD_H__ #include<iostream> #include"Noncopyable.h" #include<pthread.h> using namespace std; namespace meihao { class Thread:private Noncopyable { public: Thread(); void start(); void join(); virtual void run() = 0; virtual ~Thread(); static void* threadFunc(void*); private: pthread_t _pthId; bool _isRunning; }; }; #endif | #ifndef __PRODUCETHREAD_H__ #define __PRODUCETHREAD_H__ #include<iostream> #include"thread.h" #include"Buffer.h" using namespace std; namespace meihao { class ProduceThread:public Thread { public: ProduceThread(Buffer& buff):_buff(buff){} void run(); private: Buffer& _buff; }; }; #endif |
ConsumerThread.h | thread.cpp |
#ifndef __CONSUMERTHREAD_H__ #define __CONSUMERTHREAD_H__ #include<iostream> #include"thread.h" #include"Buffer.h" using namespace std; namespace meihao { class ConsumerThread:public Thread { public: ConsumerThread(Buffer& buff):_buff(buff){} void run(); private: Buffer& _buff; }; }; #endif | #include<iostream> #include"thread.h" using namespace std; namespace meihao { Thread::Thread():_pthId(0),_isRunning(false){} void Thread::start() { pthread_create(&_pthId,NULL,&Thread::threadFunc,this); _isRunning = true; } void Thread::join() { if(_isRunning) { pthread_join(_pthId,NULL); } } Thread::~Thread() { pthread_detach(_pthId); } void* Thread::threadFunc(void* arg) { Thread* pthread = static_cast<Thread*> (arg); if(NULL!=pthread) { pthread->run(); } } }; |
ProduceThread.cpp | ConsumerThread.cpp |
#include<iostream> #include"ProduceThread.h" #include<time.h> #include<unistd.h> using namespace std; namespace meihao { void ProduceThread::run() { ::srand(time(NULL)); int i = 0; while(i<5) { int num = ::rand()%100; _buff.push(num); cout<<"produce a num "<<num<<endl; sleep(1); i++; } } }; | #include<iostream> #include"ConsumerThread.h" #include<unistd.h> using namespace std; namespace meihao { void ConsumerThread::run() { int i = 0; while(i<5) { int num = _buff.pop(); cout<<"Consumer a num "<<num<<endl; sleep(2); i++; } } }; |
MutexLock.cpp | Condition.cpp |
#include<iostream> #include"MutexLock.h" using namespace std; namespace meihao { MutexLock::MutexLock() { pthread_mutex_init(&_mutex,NULL); } MutexLock::~MutexLock() { pthread_mutex_destroy(&_mutex); } void MutexLock::lock() { pthread_mutex_lock(&_mutex); } void MutexLock::unlock() { pthread_mutex_unlock(&_mutex); } pthread_mutex_t* MutexLock::getMutexPtr() { return &_mutex; } }; | #include<iostream> #include"Condition.h" #include"MutexLock.h" using namespace std; namespace meihao { Condition::~Condition() { pthread_cond_destroy(&_cond); } void Condition::wait() { pthread_cond_wait(&_cond,_mutex.getMutexPtr()); } void Condition::notify() { pthread_cond_signal(&_cond); } void Condition::notifyall() { pthread_cond_broadcast(&_cond); } }; |
Buffer.cpp | test.cpp |
#include<iostream> #include"Buffer.h" using namespace std; namespace meihao { Buffer::Buffer(int size):_mutex(),_notFull(_mutex),_notEmpty(_mutex),_size(size){} Buffer::~Buffer(){} bool Buffer::full() { return _size == _que.size(); } bool Buffer::empty() { return _que.size() == 0; // 这个地方脑子坏了,写成_size == 0; 导致后面pop等待作废 } void Buffer::push(DataType value) { _mutex.lock(); if(full()) { _notFull.wait(); // 等待条件变量,队列没有满,唤醒 } _que.push(value); _notEmpty.notify(); // 有一个元素进队列,_notEmpty的条件变量满足 _mutex.unlock(); } DataType Buffer::pop() { _mutex.lock(); if(empty()) { _notEmpty.wait(); // 等待不空的条件变量_notEmpty满足 } DataType tmp = _que.front(); _que.pop(); _notFull.notify(); _mutex.unlock(); return tmp; } }; | #include<iostream> #include"ProduceThread.h" #include"ConsumerThread.h" using namespace std; int main() { meihao::Buffer buffer(10); meihao::Thread* produce = new meihao::ProduceThread(buffer); meihao::Thread* consumer = new meihao::ConsumerThread(buffer); produce->start(); consumer->start(); produce->join(); consumer->join(); delete produce; delete consumer; return 0; } |