我想使用QVector
函数操作QtConcurrent::map
。我的示例程序所做的就是将QVector
中的所有值加1。
QVector<double> arr(10000000, 0);
QElapsedTimer timer;
qDebug() << QThreadPool::globalInstance()->maxThreadCount() << "Threads";
int end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; });
}
end = timer.elapsed();
qDebug() << end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
}
end = timer.elapsed();
qDebug() << end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
QFuture<void> qf = QtConcurrent::map(arr.begin(), arr.end(), [](double &x){ ++x; });
qf.waitForFinished();
}
end = timer.elapsed();
qDebug() << end;
但是程序输出
4 Threads
905 // std::transform
886 // std::for_each
876 // QtConcurrent::map
因此,多线程版本几乎没有速度优势。我验证了实际上有4个线程在运行。我使用-O2优化。更常见的
QThreadPool
方法是否更适合这种情况?编辑:
我尝试了使用
QtConcurrent::run()
的不同方法。以下是程序代码的相关部分:void add1(QVector<double>::iterator first, QVector<double>::iterator last) {
for(; first != last; ++first) {
*first += 1;
}
}
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
QFuture<void> qf[numThreads];
for(int j = 0; j < numThreads; ++j) {
qf[j] = QtConcurrent::run(add1, arr.begin()+j*n/numThreads, arr.begin()+(j+1)*n/numThreads-1);
}
for(int j = 0; j < numThreads; ++j) {
qf[j].waitForFinished();
}
因此,我将任务手动分配到不同的线程上。但是仍然很难提高性能:
181 ms // std::for_each
163 ms // QtConcurrent::run
这里还有什么问题?
最佳答案
与仅使用C++线程原语和roll-your-own-thread-pools相比,QtConcurrent似乎具有较高的开销。
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back( T t ) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); } );
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
struct thread_pool {
thread_pool( std::size_t n = 1 ) { start_thread(n); }
thread_pool( thread_pool&& ) = delete;
thread_pool& operator=( thread_pool&& ) = delete;
~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
template<class F, class R=std::result_of_t<F&()>>
std::future<R> queue_task( F task ) {
std::packaged_task<R()> p(std::move(task));
auto r = p.get_future();
tasks.push_back( std::move(p) );
return r;
}
template<class F, class R=std::result_of_t<F&()>>
std::future<R> run_task( F task ) {
if (threads_active() >= total_threads()) {
start_thread();
}
return queue_task( std::move(task) );
}
void terminate() {
tasks.terminate();
}
std::size_t threads_active() const {
return active;
}
std::size_t total_threads() const {
return threads.size();
}
void clear_threads() {
terminate();
threads.clear();
}
void start_thread( std::size_t n = 1 ) {
while(n-->0) {
threads.push_back(
std::async( std::launch::async,
[this]{
while(auto task = tasks.pop_front()) {
++active;
try{
(*task)();
} catch(...) {
--active;
throw;
}
--active;
}
}
)
);
}
}
private:
std::vector<std::future<void>> threads;
threaded_queue<std::packaged_task<void()>> tasks;
std::atomic<std::size_t> active = {};
};
struct my_timer_t {
std::chrono::high_resolution_clock::time_point first;
std::chrono::high_resolution_clock::duration duration;
void start() {
first = std::chrono::high_resolution_clock::now();
}
std::chrono::high_resolution_clock::duration finish() {
return duration = std::chrono::high_resolution_clock::now()-first;
}
unsigned long long ms() const {
return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
}
};
int main() {
std::vector<double> arr(1000000, 0);
my_timer_t timer;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; });
}
timer.finish();
auto time_transform = timer.ms();
std::cout << time_transform << "<- std::transform (" << arr[rand()%arr.size()] << ")\n";
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
}
timer.finish();
auto time_for_each = timer.ms();
std::cout << time_for_each << "<- std::for_each (" << arr[rand()%arr.size()] << ")\n";
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
enum { num_threads = 8 };
thread_pool pool(num_threads);
timer.start();
for(int i = 0; i < 100; ++i) {
std::array< std::future<void>, num_threads > tasks;
for (int t = 0; t < num_threads; ++t) {
tasks[t] = pool.run_task([&,t]{
std::for_each( arr.begin()+(arr.size()/num_threads)*t, arr.begin()+(arr.size()/num_threads)*(t+1), [](double& x){++x;} );
});
}
// std::cout << "loop! -- " << pool.threads_active() << "/" << pool.total_threads() << std::endl;
for (int t = 0; t < num_threads; ++t)
tasks[t].wait();
}
timer.finish();
auto time_pool = timer.ms();
std::cout << time_pool << "<- thread_pool (" << arr[rand()%arr.size()] << ")\n";
}
Live example。
这将产生:
153<- std::transform (100)
131<- std::for_each (200)
82<- thread_pool (300)
使用简单的C++ 11线程池以8种方式拆分任务时,可显着提高速度。 (将任务拆分为4种方式时大约为105)。
现在,我的确使用了比您小的十倍的测试集,因为当我的程序花了这么长时间运行时,在线系统超时了。
与您的线程池系统进行通信会产生开销,但是我的幼稚线程池不应表现出像这样的真实库。
现在,一个严重的问题是您可能受到内存IO的约束;如果所有人都必须等待字节,则更快地访问更多字节的线程将无济于事。