我想进行多线程处理,其中线程一将数据传递给处理数据的4-5个工作线程,而所有工作线程都完成了,我想继续。我正在使用boost来实现,但是我有一个同步问题。意思是程序在某一时刻停止并且不再继续工作。
我以前使用过OpenMP,但效果很好,但是我想单独设置线程优先级,因此我无法弄清楚如何使用OpenMP进行操作,因此我使用自己的解决方案:
如果有人可以提示您找到此代码中的错误,或者可以帮助我找到解决该问题的另一种方法,我将感到非常高兴。
谢谢,
公斤
#include <QCoreApplication>
#include <boost/thread.hpp>
#define N_CORE 6
#define N_POINTS 10
#define N_RUNS 100000
class Sema{
public:
Sema(int _n =0): m_count(_n),m_mut(),m_cond(){}
void set(int _n)
{
boost::unique_lock<boost::mutex> w_lock(m_mut);
m_count = -_n;
}
void wait()
{
boost::unique_lock<boost::mutex> lock(m_mut);
while (m_count < 0)
{
m_cond.wait(lock);
}
--m_count;
}
void post()
{
boost::unique_lock<boost::mutex> lock(m_mut);
++m_count;
m_cond.notify_all();
}
private:
boost::condition_variable m_cond;
boost::mutex m_mut;
int m_count;
};
class Pool
{
private:
boost::thread m_WorkerThread;
boost::condition_variable m_startWork;
bool m_WorkerRun;
bool m_InnerRun;
Sema * m_sem;
std::vector<int> *m_Ep;
std::vector<int> m_ret;
void calc()
{
unsigned int no_pt(m_Ep->size());
std::vector<int> c_ret;
for(unsigned int i=0;i<no_pt;i++)
c_ret.push_back(100 + m_Ep->at(i));
m_ret = c_ret;
}
void run()
{
boost::mutex WaitWorker_MUTEX;
while(m_WorkerRun)
{
boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX);
m_startWork.wait(u_lock);
calc();
m_sem->post();
}
}
public:
Pool():m_WorkerRun(false),m_InnerRun(false){}
~Pool(){}
void start(Sema * _sem){
m_WorkerRun = true;
m_sem = _sem;
m_ret.clear();
m_WorkerThread = boost::thread(&Pool::run, this);}
void stop(){m_WorkerRun = false;}
void join(){m_WorkerThread.join();}
void newWork(std::vector<int> &Ep)
{
m_Ep = &Ep;
m_startWork.notify_all();
}
std::vector<int> getWork(){return m_ret;}
};
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
Pool TP[N_CORE];
Sema _sem(0);
for(int k=0;k<N_CORE;k++)
TP[k].start(&_sem);
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
std::vector<int> V[N_CORE];
for(int k=0;k<N_CORE;k++)
for(int i=0;i<N_POINTS;i++)
{
V[k].push_back((k+1)*1000+i);
}
for(int j=0;j<N_RUNS;j++)
{
_sem.set(N_CORE);
for(int k=0;k<N_CORE;k++)
{
TP[k].newWork(V[k]);
}
_sem.wait();
for(int k=0;k<N_CORE;k++)
{
V[k].clear();
V[k]=TP[k].getWork();
if(V[k].size()!=N_POINTS)
std::cout<<"ERROR: "<<"V["<<k<<"].size(): "<<V[k].size()<<std::endl;
}
if((j+1)%100==0)
std::cout<<"LOOP: "<<j+1<<std::endl;
}
std::cout<<"FINISHED: "<<std::endl;
return a.exec();
}
最佳答案
您需要在Pool::newWork()
和Pool::run()
的调用之间进行比赛。
您必须记住,发信号/广播条件变量不是粘性事件。如果您的线程在发信号时没有等待条件变量,则信号将丢失。这就是程序中可能发生的情况:没有什么可以阻止主线程在每个Pool对象有时间在您的条件变量上调用Pool::newWork()
之前调用wait()
的。
要解决此问题,您需要将boost::mutex WaitWorker_MUTEX
移为类成员,而不是将其作为局部变量。 Pool::newWork()
需要先获取该互斥体,然后再进行更新:
boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX);
m_Ep = &Ep;
m_startWork.notify(); // no need to use notify_all()
由于您在
Pool::run()
中使用了条件变量,因此您需要处理虚假唤醒。我建议在构造对象时以及每次完成工作项时将m_Ep设置为NULL:boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX);
while (1) {
while (m_Ep == NULL && m_workerRun) {
m_startWork.wait(u_lock);
}
if (!m_workerRun) {
return;
}
calc();
m_sem->post();
m_Ep = NULL;
}
stop()将需要获取互斥体并通知():
boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX);
m_workRun = false;
m_startWork.notify();
这些更改将使10毫秒不必要的 sleep 时间。您似乎没有调用
Pool::stop()
或Pool::join()
。您应该更改代码以调用它们。通过处理
m_ret
中的Pool::calc()
,与复制结果末尾相比,您还将获得更好的性能。当您退还工作时,您也在复印。您可能希望Pool::getWork()
返回const ref到m_ret
。我没有运行此代码,因此可能还有其他问题。它应该可以帮助您移动
从您的代码看来,您可能想知道为什么条件变量需要与互斥锁并存(因为您在
Pool::run()
中声明了一个局部互斥锁)。我希望我的解决方法可以使它更清晰。关于c++ - 使用Boost C++的多线程-同步问题,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/16572643/