我已经有了Worker
类和Handler
类,已经为作业创建了一个抽象层。我想使用std::async
在混合中注入(inject)一些异步性,但是我的Visual Studio 2012(更新1)中出现了一些奇怪的行为。
我的类(class)层次结构如下:
Worker
是一个抽象类,其中Init
和Work
作为纯虚拟方法。 BasicWorker : Worker
只是将printf
用于某些输出。 GroupWorker : Worker
是其他工作人员的集合。 Handler
保留Worker
来完成某些工作。 然后,我调用几个
std::async
方法,在其中创建工作程序和处理程序,在嵌套的std::async
调用中调用处理程序,然后等待工作程序的初始化(此处为std::condition_variable
),然后停止处理程序。最后,我等待所有
std::future
结束。代码如下:
#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>
struct Worker
{
virtual ~Worker() { }
virtual void Init() = 0;
virtual void Work() = 0;
};
struct BasicWorker : public Worker
{
virtual ~BasicWorker() { }
virtual void Init()
{
printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
}
virtual void Work()
{
printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
}
};
struct GroupWorker : public Worker
{
GroupWorker()
{
workers.push_back(std::make_shared<BasicWorker>());
}
virtual ~GroupWorker() { }
virtual void Init()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Init();
}
initEvent.notify_all();
}
virtual void Work()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Work();
}
}
void WaitForInit()
{
//std::unique_lock<std::mutex> initLock(initMutex);
//initEvent.wait(initLock);
}
private:
std::mutex initMutex;
std::condition_variable initEvent;
std::vector<std::shared_ptr<Worker>> workers;
};
struct Handler
{
static const int Stopped = -1;
static const int Ready = 0;
static const int Running = 1;
Handler(const std::shared_ptr<Worker>& worker) :
worker(worker)
{ }
void Start(int count)
{
int readyValue = Ready;
if(working.compare_exchange_strong(readyValue, Running))
{
worker->Init();
for(int i = 0; i < count && working == Running; ++i)
{
worker->Work();
}
}
}
void Stop()
{
working = Stopped;
}
private:
std::atomic<int> working;
std::shared_ptr<Worker> worker;
};
std::future<void> Start(int jobIndex, int runCount)
{
//printf("Start: %d\n", jobIndex);
return std::async(std::launch::async, [=]()
{
printf("Async: %d\n", jobIndex);
auto worker = std::make_shared<GroupWorker>();
auto handler = std::make_shared<Handler>(worker);
auto result = std::async(std::launch:async, [=]()
{
printf("Nested async: %d\n", jobIndex);
handler->Start(runCount);
});
worker->WaitForInit();
handler->Stop();
result.get();
});
}
int main()
{
const int JobCount = 300;
const int RunCount = 5;
std::array<std::future<void>, JobCount> jobs;
for(int i = 0; i < JobCount; ++i)
{
jobs[i] = Start(i, RunCount);
}
for(int i = 0; i < JobCount; ++i)
{
jobs[i].get();
}
}
我的问题是:
WaitForInit@GroupWorker
函数中的行的注释,则直到进行了所有第一级异步函数调用之后,才会进行嵌套的异步函数调用std::condition_variable
时,创建新线程的感觉就像是指数增长。对于我的100个以下工作的试用版,存在一些异步性,但300个以上存在创建工作的完全顺序性。 printf
方法中的Start
行,则所有嵌套的异步工作都像一个魅力所以,
std::condition_variable
时我做错了什么? printf
有什么关系? (在竞争情况下,我尝试删除所有printf
调用,并且在代码中放置了一个断点,但没有帮助。对于std::cout
也是相同的情况)编辑:
我添加了启动策略(如Jonathan Wakely所建议),以确保创建线程。但这也无济于事。我当前正在创建
std::thread
并调用thread::join
函数以在第一级异步内部等待。 最佳答案
N.B.可以调用printf
,但不能假定std::thread::id
可转换为int
。您可以像这样使它更具可移植性:
inline long tol(std::thread::id id)
{
std::ostringstream ss;
ss << id;
return stol(ss.str());
}
(这仍然假设
std::thread::id
的字符串值可以转换为long
,这不是必需的,但是比假设隐式转换为int
的可能性更大)您没有等待的“条件”,也没有同步以确保对
notify_all
的调用发生在对wait
的调用之前。您应该有一个由Init
设置的成员变量,其内容为“此工作人员已被初始化”,并且仅在条件变量不为真时等待条件变量(该标志应该是原子的或由互斥体保护,以防止数据争用)。由于存在数百个线程,因此对共享资源的争用很多,并且对OS调度程序的压力很大,因此该实现可能决定开始返回延迟函数(即好像
std::async
用std::launch::deferred
调用了)而不是异步函数。您的代码假定async
不会返回延迟函数,因为如果异步工作程序及其嵌套的异步工作程序都作为延迟函数运行,则程序可能会死锁,因为外部函数会阻止等待嵌套函数调用Init
,但嵌套函数永远不会运行直到第一个调用result.get()
为止。您的程序不可移植,并且只能在Windows上运行,因为(如果我理解正确),MSVC async
使用工作窃取线程池,如果有可用线程,该池将运行延迟功能。 这不是标准要求的。 如果要强制每个工作人员拥有一个新线程,请使用std::launch::async
策略。这会带来一些延迟,并且可能在线程之间造成某种形式的不可靠排序,因为它们现在正在争用(甚至可能在争夺)单个全局资源。
printf
施加的延迟可能足以使一个线程完成,这将其资源释放到线程池中并允许另一个异步工作程序运行。关于c++ - 在Visual Studio 2012中将嵌套的std::async与std::condition_variable一起使用,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/13797368/