我正在尝试packaged_task创建生产者-消费者模式
代码如下:
test_thread9_producer1test_thread9_producer2将任务推入队列
test_thread9_consumer1从队列中检索任务以执行

但是,在运行test_thread9时,它可以正确执行任务,但会出现调试错误:已调用中止。我不确定为什么吗?谁能帮助我进一步了解packaged_task

第二个问题:使用者正在运行while(1)循环,我想不到优美的方式
当两个生产者将所有任务推入队列并且test_thread9_consumer1完成执行队列中的所有任务时,让test_thread9_consumer1退出。谁能给我一些建议?

void test_thread9()
{
    std::thread t1(test_thread9_producer1);
    std::thread t2(test_thread9_producer2);
    std::thread t3(test_thread9_consumer1);

    t1.join();
    t2.join();
    t3.join();
}

std::deque<std::packaged_task<int()>>task_q;
std::mutex lock9;

int factial_calc2(int in)
{
    int ret = 1;
    for (int i = in; i > 1; i--)
    {
        ret = ret*i;
    }
    std::lock_guard<std::mutex> locker(lock9);
    std::cout << "input is " << in << "result is " << ret << std::endl;
    return ret;
}

void test_thread9_producer1()
{
    for (int i = 0; i < 10; i = i + 2)
    {
        std::lock_guard<std::mutex> locker(lock9);
        std::packaged_task<int()> t1(std::bind(factial_calc2, i));
        task_q.push_back(std::move(t1));
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void test_thread9_producer2()
{
    for (int i = 1; i < 10; i = i + 2)
    {
        std::lock_guard<std::mutex> locker(lock9);
        std::packaged_task<int()> t1(std::bind(factial_calc2, i));
        task_q.push_back(std::move(t1));
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}


void test_thread9_consumer1()
{
    std::packaged_task<int()>t;
    while (1)
    {
        {
            std::lock_guard<std::mutex> locker(lock9);
            if (!task_q.empty())
            {
                t = std::move(task_q.front());
                task_q.pop_front();
            }
        }
        t();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

最佳答案

为什么会崩溃?

如果您的使用者线程发现一个空队列,尽管它没有移动,它仍将尝试执行打包的任务。这是UB,因此是运行时错误!

您可以通过检查packaged_task是否为valid来改善此情况:

while (1)
{
    std::packaged_task<int()>t;  // to make sure that valid() checks this iteration
    {
       ...
    }
    if (t.valid())
        t();  // execute only if it's a valid task
    ...
}


如何避免无休止的循环?

您可以通过某种方式跟踪运行情况。一种简单的技术是使用atomic变量来管理共享状态信息(可以并发访问而无需锁定)。

例如,您可以数出成品生产者的数目

std::atomic<int>finished{0};  // count the producers that are finished
...


无效的test_thread9_producerN()
   {
        cout <
然后,您可以使您的消费者适应以下信息:

void test_thread9_consumer1()
{
    bool nothing_to_do{false};
    while (!nothing_to_do && finished<2)
    {
    ...
        nothing_to_do=task_q.empty();  // in the lock protected section
        if (!nothing_to_do)
    ...
    }
}


Online demo

关于c++ - 使用c++ packaged_task构建生产者-消费者模式,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38424140/

10-13 05:03