概要
笔者根据 Anthony Williams 在《C++并发编程实战》中所述,
尝试调整了 《简单的线程池(七)》 中提及的非阻塞互助式线程池的实现方案,用 std::deque 替换 std::queue 作为存储工作任务的底层数据结构,并新定义非阻塞线程安全的双端队列 Lockwise_Deque<>。
工作任务始终被添加至双端队列的尾部。为了验证从缓存的角度是否可以提高性能,笔者用四种方式实现工作任务的获取,
- 当前工作线程从自己的任务队列的头部获取任务,如无工作任务则从其他任务队列的头部获取任务(A);
- 当前工作线程从自己的任务队列的尾部获取任务,如无工作任务则从其他任务队列的头部获取任务(B);
- 当前工作线程从自己的任务队列的头部获取任务,如无工作任务则从其他任务队列的尾部获取任务(C);
- 当前工作线程从自己的任务队列的尾部获取任务,如无工作任务则从其他任务队列的尾部获取任务(D)。
A 方式即为最初的非阻塞互助式,差别在于底层数据结构由 std::queue 更改为 std::deque;B 方式即按照 Anthony 所述的实现;C 和 D 是笔者为了全面确认缓存对性能的影响而增加的方式。
本文不再赘诉与 《简单的线程池(七)》 相同的内容。如有不明之处,请参考该博客。
实现
以下代码给出了非阻塞线程安全双端队列的实现,(lockwise_deque.h)
template<class T>
class Lockwise_Deque {
private:
struct Spinlock_Mutex { ...
} mutable _m_;
deque<T> _q_;
public:
void push(T&& element) {
lock_guard<Spinlock_Mutex> lk(_m_);
_q_.push_back(std::move(element)); // #1
}
bool pop(T& element) {
lock_guard<Spinlock_Mutex> lk(_m_);
if (_q_.empty())
return false;
element = std::move(_q_.front()); // #2
_q_.pop_front();
return true;
}
bool pull(T& element) {
lock_guard<Spinlock_Mutex> lk(_m_);
if (_q_.empty())
return false;
element = std::move(_q_.back()); // #3
_q_.pop_back();
return true;
}
...
};
push(T&&) 函数实现从双端队列尾部添加任务(#1),pop(T&) 函数实现从双端队列头部获取任务(#2),pull(T&) 函数实现从双端队列尾部获取任务(#3)。
以下代码给出 A 方式的实现,(lockwise_mutual_2a_pool.h)
class Thread_Pool {
...
void work(unsigned index) {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
if (_workerqueues_[index].pop(task)) // #1
task();
else
for (unsigned i = 0; i < _workersize_; ++i)
if (_workerqueues_[(index + i + 1) % _workersize_].pop(task)) { // #2
task();
break;
}
while (_suspend_.load(memory_order_acquire))
std::this_thread::yield();
}
}
...
}
当前工作线程从自己的任务队列的头部获取工作任务(#1),如无工作任务则从其他任务队列的头部获取任务(#2)。
以下代码给出 B 方式的实现,(lockwise_mutual_2b_pool.h)
class Thread_Pool {
...
void work(unsigned index) {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
if (_workerqueues_[index].pull(task)) // #1
task();
else
for (unsigned i = 0; i < _workersize_; ++i)
if (_workerqueues_[(index + i + 1) % _workersize_].pop(task)) { // #2
task();
break;
}
while (_suspend_.load(memory_order_acquire))
std::this_thread::yield();
}
}
...
}
当前工作线程从自己的任务队列的尾部获取工作任务(#1),如无工作任务则从其他任务队列的头部获取任务(#2)。
以下代码给出 C 方式的实现,(lockwise_mutual_2c_pool.h)
class Thread_Pool {
...
void work(unsigned index) {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
if (_workerqueues_[index].pop(task)) // #1
task();
else
for (unsigned i = 0; i < _workersize_; ++i)
if (_workerqueues_[(index + i + 1) % _workersize_].pull(task)) { // #2
task();
break;
}
while (_suspend_.load(memory_order_acquire))
std::this_thread::yield();
}
}
...
}
当前工作线程从自己的任务队列的头部获取工作任务(#1),如无工作任务则从其他任务队列的尾部获取任务(#2)。
以下代码给出 D 方式的实现,(lockwise_mutual_2d_pool.h)
class Thread_Pool {
...
void work(unsigned index) {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
if (_workerqueues_[index].pull(task)) // #1
task();
else
for (unsigned i = 0; i < _workersize_; ++i)
if (_workerqueues_[(index + i + 1) % _workersize_].pull(task)) { // #2
task();
break;
}
while (_suspend_.load(memory_order_acquire))
std::this_thread::yield();
}
}
...
}
当前工作线程从自己的任务队列的尾部获取工作任务(#1),如无工作任务则从其他任务队列的尾部获取任务(#2)。
逻辑
以下类图展现了此线程池的代码主要逻辑结构。
线程池用户提交任务与工作线程执行任务的并发过程与 《简单的线程池(一)》 中的一致,此处略。
验证
验证过程采用了 《简单的线程池(三)》 中定义的的测试用例。笔者对比了测试结果与 《简单的线程池(七)》 的数据,结果如下,
图1 列举了 吞吐量1的差异 在 0.5 分钟、1 分钟和 3 分钟的提交周期内不同思考时间上的对比。
【注】四种非阻塞线程安全的双端队列略称为 LM2A、LM2B、LM2C 和 LM2D,下同。
图2 列举了 吞吐量2的差异 在 0.5 分钟、1 分钟和 3 分钟的提交周期内不同思考时间上的对比。
图3 列举了 吞吐量3的差异 在 0.5 分钟、1 分钟和 3 分钟的提交周期内不同思考时间上的对比。
可以看到,B 方式的非阻塞互助式线程池与 A、C、D 方式的以及普通队列的非阻塞互助式在吞吐量上的表现近乎一致。
基于以上的对比分析,笔者认为,在非阻塞式线程池中采用双端队列,
这样的论述是值得讨论的。
最后
完整的代码示例和测试数据请参考 [github] cnblogs/15723078 。
作者参考了 C++并发编程实战 / (美)威廉姆斯 (Williams, A.) 著; 周全等译. - 北京: 人民邮电出版社, 2015.6 (2016.4重印) 一书中的部分设计思路。致 Anthony Williams、周全等译者。