目录
不安全的队列测试
下方是一个简单的程序,但是不安全:
由于代码中的线程t是在后台运行的,所以无法确定线程t是否已经完成了对myQ队列的操作,因此在主线程中处理myQ队列时,可能会出现竞争条件或者数据不一致的情况,导致输出的结果不确定。
#include<iostream>
#include<thread>
#include<queue>
using namespace std;
void InsertData(queue<int>& data, int num)
{
data.push(num);
}
void test()
{
queue<int> myQ;
for (int i = 0; i < 10; i++)
{
thread t(InsertData,ref(myQ), ref(i));
//t.join();用join是没毛病的
t.detach();//但是驻留后台,就不一定会稳定,线程会不安全
}
this_thread::sleep_for(3s);
while (!myQ.empty())
{
cout << myQ.front() << endl;
myQ.pop();
}
}
int main()
{
test();
return 0;
}
简单封装一个线程安全队列
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
template<class T>
class Thread_Safe_Queue
{
public:
Thread_Safe_Queue() {};//构造函数
Thread_Safe_Queue(Thread_Safe_Queue const& other) //拷贝函数
{
lock_guard<mutex> CopyLock(other.mut);//自动加锁
DataQueue = other.DataQueue;
};
void PushData(T NewData)//入数据
{
lock_guard<mutex> CopyLock(mut);//自动加锁
DataQueue.push(NewData);
DataCond.notify_one();//数据存储完毕后,随机唤醒另一个准备完毕的线程
}
//等待数据插入结束后在执行数据的出队过程
void WaitPop(T& OldData)
{
unique_lock<mutex> CopyLock(mut);//手动加锁
DataCond.wait(CopyLock, [this] { return !DataQueue.empty() || DataReady; });//等到数据不为空或者有新数据到来的时候,再进行出队的过程
if (!DataQueue.empty())
{
OldData = DataQueue.front();
DataQueue.pop();
}
DataReady = false; // 重置标志
}
shared_ptr<T> WaitPop()//重载一下
{
unique_lock<mutex> CopyLock(mut);//手动加锁
DataCond.wait(CopyLock, [this] { return !DataQueue.empty() || DataReady; });//等到数据不为空或者有新数据到来的时候,再进行出队的过程
if (!DataQueue.empty())
{
shared_ptr<T> res(make_shared<T>(DataQueue.front()));
DataQueue.pop();
DataReady = false; // 重置标志
return res;
}
return nullptr; // 返回nullptr表示没有数据可取
}
bool empty() const //判断是否为空
{
lock_guard<mutex> lk(mut);//加锁
return DataQueue.empty();
}
protected:
mutable mutex mut;//互斥锁,用于保护队列DataQueue的访问
queue<T> DataQueue;//数据队列
condition_variable DataCond;//用于线程之间的同步,等待某个条件的发生
bool DataReady = false; // 标记是否有新数据到来
};
//测试函数
void testInsert(Thread_Safe_Queue<int>& data, int num)
{
data.PushData(num);
}
void ThreadSafeTest()
{
Thread_Safe_Queue<int> myQ;
for (int i = 0; i < 1500; i++)
{
thread t(testInsert, ref(myQ), ref(i));
t.detach();
}
//this_thread::sleep_for(1s);
int num = 0;
while (true)
{
auto res = myQ.WaitPop();
if (res == nullptr)
break;
cout << *res << "\n";
}
}
int main()
{
ThreadSafeTest();
return 0;
}