我正在尝试实现一个使用两个线程的类:一个用于生产者,一个用于消费者。当前实现不使用锁:

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>

using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;

class Worker
{
public:
    Worker() : working_(false), done_(false) {}
    ~Worker() {
        done_ = true;    // exit even if the work has not been completed
        worker_.join();
    }

    void enqueue(int value) {
        queue_.push(value);
        if (!working_) {
            working_ = true;
            worker_ = std::thread([this]{ work(); });
        }
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_ = false;
    }

private:
    std::atomic<bool> working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

该应用程序需要将工作项排队一定时间,然后休眠以等待事件。这是模拟行为的最小主体:
int main()
{
    Worker w;
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

我非常确定自己的实现存在错误:如果工作线程完成并且在执行working_ = false之前,又出现另一个enqueue,该怎么办?是否可以在不使用锁的情况下使我的代码线程安全?

该解决方案需要:
  • 快速加入
  • 即使队列不为空,析构函数也必须退出
  • 没有忙碌的等待,因为工作线程处于空闲状态的时间很长
  • 如果可能的话,没有锁

  • 编辑

    根据您的建议,我对Worker类进行了另一种实现。这是我的第二次尝试:
    class Worker
    {
    public:
        Worker()
            : working_(ATOMIC_FLAG_INIT), done_(false) { }
    
        ~Worker() {
            // exit even if the work has not been completed
            done_ = true;
            if (worker_.joinable())
                worker_.join();
        }
    
        bool enqueue(int value) {
            bool enqueued = queue_.push(value);
            if (!working_.test_and_set()) {
                if (worker_.joinable())
                    worker_.join();
                worker_ = std::thread([this]{ work(); });
            }
            return enqueued;
        }
    
        void work() {
            int value;
            while (!done_ && queue_.pop(value)) {
                std::cout << value << std::endl;
            }
            working_.clear();
            while (!done_ && queue_.pop(value)) {
                std::cout << value << std::endl;
            }
        }
    
    private:
        std::atomic_flag working_;
        std::atomic<bool> done_;
        Queue queue_;
        std::thread worker_;
    };
    

    我在worker_.join()方法中介绍了enqueue。这可能会影响性能,但是在极少数情况下(当队列为空且线程退出之前,会出现另一个enqueue)。 working_变量现在是在atomic_flag中设置并在enqueue中清除的work。需要while之后的Additional working_.clear(),因为如果在clear之前但在while之后推送了另一个值,则不会处理该值。

    这个实现正确吗?

    我做了一些测试,实现似乎正常。

    OT:最好将其作为编辑或答案吗?

    最佳答案



    然后,该值将被推送到队列,但直到设置了该标志后将另一个值加入队列,才将其处理。您(或您的用户)可以决定是否可以接受。使用锁可以避免这种情况,但是它们违反了您的要求。

    如果正在运行的线程即将完成并设置working_ = false;,但在将下一个值加入队列之前没有停止运行,则该代码可能会失败。在这种情况下,您的代码将在正在运行的线程上调用operator=,这将根据链接的文档对std::terminate进行调用。

    在将工作程序分配给新线程之前添加worker_.join()应该可以避免这种情况。

    另一个问题是,如果队列已满,则queue_.push可能会失败,因为它的大小是固定的。当前,您只是忽略大小写,并且该值不会添加到完整队列中。如果等待队列中有空间,则不会快速进入队列(在边缘情况下)。您可以采用push返回的 bool 值(告诉它是否成功),然后从enqueue返回。这样,调用方可以决定是要等待还是放弃该值。

    或使用非固定大小的队列。 Boost对这种选择有以下说法:

    10-05 18:53