问题描述
在使用C ++ 11编写的分布式作业系统中,我使用以下结构实现了栅栏(即工作线程池外部的线程可能要求阻塞,直到完成所有当前调度的作业):
In a distributed job system written in C++11 I have implemented a fence (i.e. a thread outside the worker thread pool may ask to block until all currently scheduled jobs are done) using the following structure:
struct fence
{
std::atomic<size_t> counter;
std::mutex resume_mutex;
std::condition_variable resume;
fence(size_t num_threads)
: counter(num_threads)
{}
};
实施围栏的代码如下:
void task_pool::fence_impl(void *arg)
{
auto f = (fence *)arg;
if (--f->counter == 0) // (1)
// we have zeroed this fence's counter, wake up everyone that waits
f->resume.notify_all(); // (2)
else
{
unique_lock<mutex> lock(f->resume_mutex);
f->resume.wait(lock); // (3)
}
}
在一段时间内进入围栏。然而,如果它们几乎同时尝试做,似乎有时发生在原子递减(1)和开始对条件var(3)的等待之间,线程产生CPU时间,另一个线程将计数器递减到零1)并触发cond。 var(2)。这导致前面的线程在(3)中永远等待,因为它已经被通知后开始等待它。
This works very well if threads enter the fence over a period of time. However, if they try to do it almost simultaneously, it seems to sometimes happen that between the atomic decrementation (1) and starting the wait on the conditional var (3), the thread yields CPU time and another thread decrements the counter to zero (1) and fires the cond. var (2). This results in the previous thread waiting forever in (3), because it starts waiting on it after it has already been notified.
一个黑客要使事情可行的是
A hack to make the thing workable is to put a 10 ms sleep just before (2), but that's unacceptable for obvious reasons.
有关如何以高效的方式解决这个问题的任何建议?
Any suggestions on how to fix this in a performant way?
推荐答案
您的诊断是正确的,此代码很容易失去条件通知,如您所述。也就是说在一个线程锁定互斥量之后,但在等待条件变量之前,另一个线程可能调用notify_all(),以便第一个线程错过该通知。
Your diagnose is correct, this code is prone to lose condition notifications in the way you described. I.e. after one thread locked the mutex but before waiting on the condition variable another thread may call notify_all() so that the first thread misses that notification.
一个简单的解决方法是锁定在递减计数器之前互斥体,同时通知:
A simple fix is to lock the mutex before decrementing the counter and while notifying:
void task_pool::fence_impl(void *arg)
{
auto f = static_cast<fence*>(arg);
std::unique_lock<std::mutex> lock(f->resume_mutex);
if (--f->counter == 0) {
f->resume.notify_all();
}
else do {
f->resume.wait(lock);
} while(f->counter);
}
在这种情况下,计数器不必是原子的。
In this case the counter need not be atomic.
在通知之前锁定互斥量的额外加分(或惩罚,取决于观点)是(从):
An added bonus (or penalty, depending on the point of view) of locking the mutex before notifying is (from here):
关于 while
循环(来自):
这篇关于使用std :: atomic和std :: condition_variable进行同步是不可靠的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!