目录

不安全的队列测试

简单封装一个线程安全队列


不安全的队列测试

下方是一个简单的程序,但是不安全:

由于代码中的线程t是在后台运行的,所以无法确定线程t是否已经完成了对myQ队列的操作,因此在主线程中处理myQ队列时,可能会出现竞争条件或者数据不一致的情况,导致输出的结果不确定。

#include<iostream>
#include<thread>
#include<queue>
using namespace std;
void InsertData(queue<int>& data, int num)
{
	data.push(num);
}
void test()
{
	queue<int> myQ;
	for (int i = 0; i < 10; i++)
	{
		thread t(InsertData,ref(myQ), ref(i));
		//t.join();用join是没毛病的
		t.detach();//但是驻留后台,就不一定会稳定,线程会不安全
	}
	this_thread::sleep_for(3s);
	while (!myQ.empty())
	{
		cout << myQ.front() << endl;
		myQ.pop();
	}
}

int main()
{
	test();

	return 0;
}

简单封装一个线程安全队列

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
template<class T>
class Thread_Safe_Queue
{
public:
	Thread_Safe_Queue() {};//构造函数
	Thread_Safe_Queue(Thread_Safe_Queue const& other) //拷贝函数
	{
		lock_guard<mutex> CopyLock(other.mut);//自动加锁
		DataQueue = other.DataQueue;
	};
	void PushData(T NewData)//入数据
	{
		lock_guard<mutex> CopyLock(mut);//自动加锁
		DataQueue.push(NewData);
		DataCond.notify_one();//数据存储完毕后,随机唤醒另一个准备完毕的线程
	}
	//等待数据插入结束后在执行数据的出队过程
	void WaitPop(T& OldData)
	{
		unique_lock<mutex> CopyLock(mut);//手动加锁
		DataCond.wait(CopyLock, [this] { return !DataQueue.empty() || DataReady; });//等到数据不为空或者有新数据到来的时候,再进行出队的过程
		if (!DataQueue.empty())
		{
			OldData = DataQueue.front();
			DataQueue.pop();
		}
		DataReady = false; // 重置标志
	}
	shared_ptr<T> WaitPop()//重载一下
	{
		unique_lock<mutex> CopyLock(mut);//手动加锁
		DataCond.wait(CopyLock, [this] { return !DataQueue.empty() || DataReady; });//等到数据不为空或者有新数据到来的时候,再进行出队的过程
		if (!DataQueue.empty())
		{
			shared_ptr<T> res(make_shared<T>(DataQueue.front()));
			DataQueue.pop();
			DataReady = false; // 重置标志
			return res;
		}
		return nullptr; // 返回nullptr表示没有数据可取
	}
	bool empty() const //判断是否为空
	{
		lock_guard<mutex> lk(mut);//加锁
		return DataQueue.empty();
	}
protected:
	mutable mutex mut;//互斥锁,用于保护队列DataQueue的访问
	queue<T> DataQueue;//数据队列
	condition_variable DataCond;//用于线程之间的同步,等待某个条件的发生
	bool DataReady = false; // 标记是否有新数据到来
};

//测试函数
void testInsert(Thread_Safe_Queue<int>& data, int num)
{
	data.PushData(num);
}

void ThreadSafeTest()
{
	Thread_Safe_Queue<int> myQ;
	for (int i = 0; i < 1500; i++)
	{
		thread t(testInsert, ref(myQ), ref(i));
		t.detach();
	}
	//this_thread::sleep_for(1s);
	int num = 0;
	while (true)
	{
		auto res = myQ.WaitPop();
		if (res == nullptr)
			break;
		cout << *res << "\n";
	}
}

int main()
{
	ThreadSafeTest();
	return 0;
}
07-08 01:44