编辑:

我现在已经完成了队列(克服了下面描述的问题,等等)。对于那些感兴趣的人,可以找到here。我很高兴听到任何评论:)。请注意,队列不仅是工作项队列,而且是模板容器,当然可以用工作项实例化该容器。

原版的:

在C ++ 11和14中观察并发性的Herb Sutter's talk之后,我对非阻塞并发性感到非常兴奋。

但是,我还无法找到我认为是基本问题的解决方案。所以,如果这已经在这里,请和我保持温和。

我的问题很简单。我正在创建一个非常简单的线程池。为此,我在workPool类中运行了一些工作线程。我保留了workItems的列表。

如何以无锁方式添加工作项。

实现此目的的非锁定方式当然是创建互斥体。如果添加一个项目并在当前工作项目完成后阅读(当然也要锁定)列表,则将其锁定。

但是,我不知道如何以无锁方式进行此操作。

下面是我正在创建的大致概念。我为这个问题编写的代码。而且它既不完整,也没有错误:)

#include <thread>
#include <deque>
#include <vector>

class workPool
{
public:
    workPool(int workerCount) :
        running(1)
    {
        for (int i = workerCount; i > 0; --i)
            workers.push_back(std::thread(&workPool::doWork, this));
    }

    ~workPool()
    {
        running = 0;
    }
private:
    bool running;
    std::vector< std::thread > workers;
    std::deque< std::function<void()> > workItems;

    void doWork()
    {
        while (running)
        {
            (*workItems.begin())();
            workItems.erase(workItems.begin());
            if (!workItems.size())
                //here the thread should be paused till a new item is added
        }


    }

    void addWorkitem()
    {
        //This is my confusion. How should I do this?
    }

};

最佳答案

我最近看过赫伯的谈话,我相​​信他的lock-free linked list应该很好。唯一的问题是atomic< shared_ptr<T> >尚未实现。正如Herb在他的演讲中所解释的那样,我已经使用了atomic_*函数调用。

在示例中,我将任务简化为一个int,但是它可以是您想要的任何东西。

函数atomic_compare_exchange_weak带有三个参数:要比较的项目,期望值和期望值。它返回true或false表示成功或失败。失败时,期望值将更改为找到的值。

#include <memory>
#include <atomic>

// Untested code.

struct WorkItem { // Simple linked list implementation.
    int work;
    shared_ptr<WorkItem> next; // remember to use as atomic
};

class WorkList {
    shared_ptr<WorkItem> head; // remember to use as atomic
public:
    // Used by producers to add work to the list. This implementation adds
    // new items to the front (stack), but it can easily be changed to a queue.
    void push_work(int work) {
        shared_ptr<WorkItem> p(new WorkItem()); // The new item we want to add.
        p->work = work;
        p->next = head;

        // Do we get to change head to p?
        while (!atomic_compare_exchange_weak(&head, &p->next, p)) {
            // Nope, someone got there first, try again with the new p->next,
            // and remember: p->next is automatically changed to the new value of head.
        }
        // Yup, great! Everything's done then.
    }

    // Used by consumers to claim items to process.
    int pop_work() {
        auto p = atomic_load(&head); // The item we want to process.
        int work = (p ? p->work : -1);

        // Do we get to change head to p->next?
        while (p && !atomic_compare_exchange_weak(&head, &p, p->next)) {
            // Nope, someone got there first, try again with the new p,
            // and remember: p is automatically changed to the new value of head.
            work = (p ? p->work : -1); // Make sure to update work as well!
        }
        // Yup, great! Everything's done then, return the new task.
        return work; // Returns -1 if list is empty.
    }
};


编辑:在谈话中解释了将shared_ptratomic_*功能结合使用的原因。简而言之:从链接列表中弹出一个项目可能会将其从遍历该列表的人员的下方删除,或者可能在同一内存地址(The ABA Problem)上分配了另一个节点。使用shared_ptr将确保所有旧阅读者都拥有对原始项目的有效引用。

正如Herb解释的那样,这使得实现弹出功能变得微不足道。

关于c++ - 将工作项添加到数组或列表的非阻塞方式,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/30019413/

10-11 22:43
查看更多