我已经有了Worker类和Handler类,已经为作业创建了一个抽象层。我想使用std::async在混合中注入(inject)一些异步性,但是我的Visual Studio 2012(更新1)中出现了一些奇怪的行为。

我的类(class)层次结构如下:

  • Worker是一个抽象类,其中InitWork作为纯虚拟方法。
  • BasicWorker : Worker只是将printf用于某些输出。
  • GroupWorker : Worker是其他工作人员的集合。
  • Handler保留Worker来完成某些工作。

  • 然后,我调用几个std::async方法,在其中创建工作程序和处理程序,在嵌套的std::async调用中调用处理程序,然后等待工作程序的初始化(此处为std::condition_variable),然后停止处理程序。

    最后,我等待所有std::future结束。

    代码如下:
    #include <stdio.h>
    #include <future>
    #include <array>
    #include <atomic>
    #include <vector>
    
    struct Worker
    {
        virtual ~Worker() { }
        virtual void Init() = 0;
        virtual void Work() = 0;
    };
    
    struct BasicWorker : public Worker
    {
        virtual ~BasicWorker() { }
        virtual void Init()
        {
            printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
        }
    
        virtual void Work()
        {
            printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
        }
    };
    
    struct GroupWorker : public Worker
    {
        GroupWorker()
        {
            workers.push_back(std::make_shared<BasicWorker>());
        }
    
        virtual ~GroupWorker() { }
    
        virtual void Init()
        {
            for(int i = 0; i < workers.size(); ++i)
            {
                workers[i]->Init();
            }
            initEvent.notify_all();
        }
    
        virtual void Work()
        {
            for(int i = 0; i < workers.size(); ++i)
            {
                workers[i]->Work();
            }
        }
    
        void WaitForInit()
        {
            //std::unique_lock<std::mutex> initLock(initMutex);
            //initEvent.wait(initLock);
        }
    private:
        std::mutex initMutex;
        std::condition_variable initEvent;
        std::vector<std::shared_ptr<Worker>> workers;
    };
    
    struct Handler
    {
        static const int Stopped = -1;
        static const int Ready = 0;
        static const int Running = 1;
    
        Handler(const std::shared_ptr<Worker>& worker) :
            worker(worker)
        { }
    
        void Start(int count)
        {
            int readyValue = Ready;
            if(working.compare_exchange_strong(readyValue, Running))
            {
                worker->Init();
    
                for(int i = 0; i < count && working == Running; ++i)
                {
                    worker->Work();
                }
            }
        }
    
        void Stop()
        {
            working = Stopped;
        }
    private:
        std::atomic<int> working;
        std::shared_ptr<Worker> worker;
    };
    
    std::future<void> Start(int jobIndex, int runCount)
    {
        //printf("Start: %d\n", jobIndex);
        return std::async(std::launch::async, [=]()
        {
            printf("Async: %d\n", jobIndex);
            auto worker = std::make_shared<GroupWorker>();
            auto handler = std::make_shared<Handler>(worker);
    
            auto result = std::async(std::launch:async, [=]()
            {
                printf("Nested async: %d\n", jobIndex);
                handler->Start(runCount);
            });
    
            worker->WaitForInit();
            handler->Stop();
    
            result.get();
        });
    }
    
    int main()
    {
        const int JobCount = 300;
        const int RunCount =  5;
        std::array<std::future<void>, JobCount> jobs;
    
        for(int i = 0; i < JobCount; ++i)
        {
            jobs[i] = Start(i, RunCount);
        }
    
        for(int i = 0; i < JobCount; ++i)
        {
            jobs[i].get();
        }
    }
    

    我的问题是:
  • 如果我取消对WaitForInit@GroupWorker函数中的行的注释,则直到进行了所有第一级异步函数调用之后,才会进行嵌套的异步函数调用
  • 如果我增加了作业数量,则在等待std::condition_variable时,创建新线程的感觉就像是指数增长。对于我的100个以下工作的试用版,存在一些异步性,但300个以上存在创建工作的完全顺序性。
  • 然后,如果我取消注释printf方法中的Start行,则所有嵌套的异步工作都像一个魅力

  • 所以,
  • 在使用std::condition_variable时我做错了什么?
  • 为什么创建作业的速度降低了100个线程? (此问题是可选的,似乎是操作系统的问题,可以使用智能线程池概念进行修复)
  • 这与printf有什么关系? (在竞争情况下,我尝试删除所有printf调用,并且在代码中放置了一个断点,但没有帮助。对于std::cout也是相同的情况)

  • 编辑:
    我添加了启动策略(如Jonathan Wakely所建议),以确保创建线程。但这也无济于事。我当前正在创建std::thread并调用thread::join函数以在第一级异步内部等待。

    最佳答案

    N.B.可以调用printf,但不能假定std::thread::id可转换为int。您可以像这样使它更具可移植性:

    inline long tol(std::thread::id id)
    {
      std::ostringstream ss;
      ss << id;
      return stol(ss.str());
    }
    

    (这仍然假设std::thread::id的字符串值可以转换为long,这不是必需的,但是比假设隐式转换为int的可能性更大)



    您没有等待的“条件”,也没有同步以确保对notify_all的调用发生在对wait的调用之前。您应该有一个由Init设置的成员变量,其内容为“此工作人员已被初始化”,并且仅在条件变量不为真时等待条件变量(该标志应该是原子的或由互斥体保护,以防止数据争用)。



    由于存在数百个线程,因此对共享资源的争用很多,并且对OS调度程序的压力很大,因此该实现可能决定开始返回延迟函数(即好像std::asyncstd::launch::deferred调用了)而不是异步函数。您的代码假定async不会返回延迟函数,因为如果异步工作程序及其嵌套的异步工作程序都作为延迟函数运行,则程序可能会死锁,因为外部函数会阻止等待嵌套函数调用Init,但嵌套函数永远不会运行直到第一个调用result.get()为止。您的程序不可移植,并且只能在Windows上运行,因为(如果我理解正确),MSVC async使用工作窃取线程池,如果有可用线程,该池将运行延迟功能。 这不是标准要求的。 如果要强制每个工作人员拥有一个新线程,请使用std::launch::async策略。



    这会带来一些延迟,并且可能在线程之间造成某种形式的不可靠排序,因为它们现在正在争用(甚至可能在争夺)单个全局资源。 printf施加的延迟可能足以使一个线程完成,这将其资源释放到线程池中并允许另一个异步工作程序运行。

    关于c++ - 在Visual Studio 2012中将嵌套的std::async与std::condition_variable一起使用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13797368/

    10-11 22:23
    查看更多