问题描述
我正在使用concurrent_bounded_queue
Intel TBB 4.1 Update 3进行生产者线程和使用者线程之间的通信:
I am using concurrent_bounded_queue
Intel TBB 4.1 Update 3 for communication between producer and consumer threads:
- concurrent_queue
- concurrent_bounded_queue
队列类有一个名为abort
的方法,该方法将tbb::user_abort
扔给在队列实例的pop
和push
上阻塞的所有线程.两个线程之间的通信可能如下所示:
The queue class has a method called abort
which throws tbb::user_abort
to all threads which block on pop
and push
of the queue instance. The communication between two threads may look like this:
ConsThread | ProdThread
-----------+-------------
q.pop | get new data
(wait) | q.push
process | get new data
q.pop | no more data!
(wait) | q.abort
quit | quit
不幸的是,即使在这个简单的示例中,我也无法使用它来可靠地关闭队列,因为如果某些使用者在调用abort
之前未完成处理先前pop
ped的数据,他们将完成迭代并返回到pop
上的阻止:
Unfortunately, I cannot use it to reliably shut down the queue even in this simple example, because if some of the consumers are not done processing previously pop
ped data before the call to abort
, they will finish the iteration and return to blocking on pop
:
ConsThread | ProdThread
-----------+-------------
q.pop | get new data
(wait) | q.push
process | get new data
process | no more data!
process | q.abort
process | quit
process |
q.pop |
(wait) |
(wait) |
(wait) |
(so lonely)|
现在,我正在使用一个令人作呕的黑客,它会产生另一个未分离的线程(该线程加入了使用者池线程),并等待它完成,同时不时发送更多abort
给后来者:
Right now I am employing a moderately disgusting hack that spawns another non-detached thread (which joins the consumer pool threads) and waits for it to finish while sending more abort
s from time to time for the late comers:
bool areConsumerThreadsJoinedThankYou = false;
std::thread joiner(Joiner(consumerPool, &areConsumerThreadsJoinedThankYou));
while (!areConsumerThreadsJoinedThankYou) {
rawQueue.abort();
MAGIC_MSLEEP(100);
}
class Joiner
的实现方式很多
void Joiner::operator()(void)
{
for (auto it = this->m_threadPool.begin();
it < this->m_threadPool.end();
it++)
(*it)->join();
this->m_done = true;
*(this->m_flag) = true;
}
这当然很丑.还有更根本的解决方案吗?
This of course is very ugly. Is there a more fundamental solution?
推荐答案
创建一个指定的"EndOfData"项.如果您知道有K个使用者,则在完成推送数据项后,让生产者推送K个"EndOfData"项.让每个使用者在弹出"EndOfData"项后退出.
Create a designated "EndOfData" item. If you know you have K consumers, have the producer push K "EndOfData" items after it is done pushing data items. Have each consumer quit after it pops a "EndOfData" item.
如果事先不知道K,请让生产者推送单个"EndOfData"项.然后让每个弹出"EndOfData"项目的消费者在离开之前推送另一个"EndOfData"项目.完成所有使用者操作后,将剩下一个"EndOfData"项,该项将在销毁队列时销毁.
If K is not known in advance, have the producer push a single "EndOfData" item. Then have each consumer who pops an "EndOfData" item push another "EndOfData" item before leaving. After all consumers are done, there will be a single "EndOfData" item remaining, which will be destroyed when the queue is destroyed.
这篇关于线程构建基块parallel_bounded_queue —如何“关闭"它?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!