本文介绍了有效地等待线程池中的所有任务完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前在我的线程池中有一个带有 x 个工作者的程序.在主循环中,将 y 任务分配给工作人员完成,但是在发出任务之后,我必须等待所有任务完成后才能继续执行程序.我认为我当前的解决方案效率低下,必须有一种更好的方法来等待所有任务完成,但是我不确定该怎么做

I currently have a program with x workers in my threadpool. During the main loop y tasks are assigned to the workers to complete, but after the tasks are sent out I must wait for all tasks for finish before preceding with the program. I believe my current solution is inefficient, there must be a better way to wait for all tasks to finish but I am not sure how to go about this

// called in main after all tasks are enqueued to
// std::deque<std::function<void()>> tasks
void ThreadPool::waitFinished()
{
    while(!tasks.empty()) //check if there are any tasks in queue waiting to be picked up
    {
        //do literally nothing
    }
}


更多信息:


More information:

线程池结构

//worker thread objects
class Worker {
    public:
        Worker(ThreadPool& s): pool(s) {}
        void operator()();
    private:
        ThreadPool &pool;
};

//thread pool
class ThreadPool {
    public:
        ThreadPool(size_t);
        template<class F>
        void enqueue(F f);
        void waitFinished();
        ~ThreadPool();
    private:
        friend class Worker;
        //keeps track of threads so we can join
        std::vector< std::thread > workers;
        //task queue
        std::deque< std::function<void()> > tasks;
        //sync
        std::mutex queue_mutex;
        std::condition_variable condition;
        bool stop;
};

或这是我threadpool.hpp的要点

or here's a gist of my threadpool.hpp

我要用于waitFinished()的示例:

while(running)
    //....
    for all particles alive
        push particle position function to threadpool
    end for

    threadPool.waitFinished();

    push new particle position data into openGL buffer
end while

这样,我可以并行发送成千上万的粒子位置任务,等待它们完成并将新数据放入openGL位置缓冲区中

so this way I can send hundrends of thousands of particle position tasks to be done in parallel, wait for them to finish and put the new data inside the openGL position buffers

推荐答案

这是完成您要尝试的一种方法.除非您知道内部正在发生什么,否则不要在同一个互斥锁上使用两个条件变量.除了显示每次运行之间要完成多少项的愿望外,我不需要原子处理的成员.

This is one way to do what you're trying. Using two condition variables on the same mutex is not for the light-hearted unless you know what is going on internally. I didn't need the atomic processed member other than my desire to demonstrate how many items were finished between each run.

其中的示例工作负载函数会生成一百万个随机int值,然后对它们进行排序(必须以一种或另一种方式加热我的办公室). waitFinished直到队列为空 没有线程繁忙时才会返回.

The sample workload function in this generates one million random int values, then sorts them (gotta heat my office one way or another). waitFinished will not return until the queue is empty and no threads are busy.

#include <iostream>
#include <deque>
#include <functional>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <random>

//thread pool
class ThreadPool
{
public:
    ThreadPool(unsigned int n = std::thread::hardware_concurrency());

    template<class F> void enqueue(F&& f);
    void waitFinished();
    ~ThreadPool();

    unsigned int getProcessed() const { return processed; }

private:
    std::vector< std::thread > workers;
    std::deque< std::function<void()> > tasks;
    std::mutex queue_mutex;
    std::condition_variable cv_task;
    std::condition_variable cv_finished;
    std::atomic_uint processed;
    unsigned int busy;
    bool stop;

    void thread_proc();
};

ThreadPool::ThreadPool(unsigned int n)
    : busy()
    , processed()
    , stop()
{
    for (unsigned int i=0; i<n; ++i)
        workers.emplace_back(std::bind(&ThreadPool::thread_proc, this));
}

ThreadPool::~ThreadPool()
{
    // set stop-condition
    std::unique_lock<std::mutex> latch(queue_mutex);
    stop = true;
    cv_task.notify_all();
    latch.unlock();

    // all threads terminate, then we're done.
    for (auto& t : workers)
        t.join();
}

void ThreadPool::thread_proc()
{
    while (true)
    {
        std::unique_lock<std::mutex> latch(queue_mutex);
        cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
        if (!tasks.empty())
        {
            // got work. set busy.
            ++busy;

            // pull from queue
            auto fn = tasks.front();
            tasks.pop_front();

            // release lock. run async
            latch.unlock();

            // run function outside context
            fn();
            ++processed;

            latch.lock();
            --busy;
            cv_finished.notify_one();
        }
        else if (stop)
        {
            break;
        }
    }
}

// generic function push
template<class F>
void ThreadPool::enqueue(F&& f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    tasks.emplace_back(std::forward<F>(f));
    cv_task.notify_one();
}

// waits until the queue is empty.
void ThreadPool::waitFinished()
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    cv_finished.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
}

// a cpu-busy task.
void work_proc()
{
    std::random_device rd;
    std::mt19937 rng(rd());

    // build a vector of random numbers
    std::vector<int> data;
    data.reserve(100000);
    std::generate_n(std::back_inserter(data), data.capacity(), [&](){ return rng(); });
    std::sort(data.begin(), data.end(), std::greater<int>());
}

int main()
{
    ThreadPool tp;

    // run five batches of 100 items
    for (int x=0; x<5; ++x)
    {
        // queue 100 work tasks
        for (int i=0; i<100; ++i)
            tp.enqueue(work_proc);

        tp.waitFinished();
        std::cout << tp.getProcessed() << '\n';
    }

    // destructor will close down thread pool
    return EXIT_SUCCESS;
}

输出

100
200
300
400
500

好运.

这篇关于有效地等待线程池中的所有任务完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 06:19