我正在尝试实现一个使用两个线程的类:一个用于生产者,一个用于消费者。当前实现不使用锁:
#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
using Queue =
boost::lockfree::spsc_queue<
int,
boost::lockfree::capacity<1024>>;
class Worker
{
public:
Worker() : working_(false), done_(false) {}
~Worker() {
done_ = true; // exit even if the work has not been completed
worker_.join();
}
void enqueue(int value) {
queue_.push(value);
if (!working_) {
working_ = true;
worker_ = std::thread([this]{ work(); });
}
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_ = false;
}
private:
std::atomic<bool> working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
该应用程序需要将工作项排队一定时间,然后休眠以等待事件。这是模拟行为的最小主体:
int main()
{
Worker w;
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
for (int i = 0; i < 1000; ++i)
w.enqueue(i);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
我非常确定自己的实现存在错误:如果工作线程完成并且在执行
working_ = false
之前,又出现另一个enqueue
,该怎么办?是否可以在不使用锁的情况下使我的代码线程安全?该解决方案需要:
编辑
根据您的建议,我对
Worker
类进行了另一种实现。这是我的第二次尝试:class Worker
{
public:
Worker()
: working_(ATOMIC_FLAG_INIT), done_(false) { }
~Worker() {
// exit even if the work has not been completed
done_ = true;
if (worker_.joinable())
worker_.join();
}
bool enqueue(int value) {
bool enqueued = queue_.push(value);
if (!working_.test_and_set()) {
if (worker_.joinable())
worker_.join();
worker_ = std::thread([this]{ work(); });
}
return enqueued;
}
void work() {
int value;
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
working_.clear();
while (!done_ && queue_.pop(value)) {
std::cout << value << std::endl;
}
}
private:
std::atomic_flag working_;
std::atomic<bool> done_;
Queue queue_;
std::thread worker_;
};
我在
worker_.join()
方法中介绍了enqueue
。这可能会影响性能,但是在极少数情况下(当队列为空且线程退出之前,会出现另一个enqueue
)。 working_
变量现在是在atomic_flag
中设置并在enqueue
中清除的work
。需要while
之后的Additional working_.clear()
,因为如果在clear
之前但在while
之后推送了另一个值,则不会处理该值。这个实现正确吗?
我做了一些测试,实现似乎正常。
OT:最好将其作为编辑或答案吗?
最佳答案
然后,该值将被推送到队列,但直到设置了该标志后将另一个值加入队列,才将其处理。您(或您的用户)可以决定是否可以接受。使用锁可以避免这种情况,但是它们违反了您的要求。
如果正在运行的线程即将完成并设置working_ = false;
,但在将下一个值加入队列之前没有停止运行,则该代码可能会失败。在这种情况下,您的代码将在正在运行的线程上调用operator=,这将根据链接的文档对std::terminate
进行调用。
在将工作程序分配给新线程之前添加worker_.join()
应该可以避免这种情况。
另一个问题是,如果队列已满,则queue_.push
可能会失败,因为它的大小是固定的。当前,您只是忽略大小写,并且该值不会添加到完整队列中。如果等待队列中有空间,则不会快速进入队列(在边缘情况下)。您可以采用push
返回的 bool 值(告诉它是否成功),然后从enqueue
返回。这样,调用方可以决定是要等待还是放弃该值。
或使用非固定大小的队列。 Boost对这种选择有以下说法: