我想使用QVector函数操作QtConcurrent::map。我的示例程序所做的就是将QVector中的所有值加1。

QVector<double> arr(10000000, 0);
QElapsedTimer timer;
qDebug() << QThreadPool::globalInstance()->maxThreadCount() << "Threads";

int end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; });
}
end = timer.elapsed();
qDebug() << end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
}
end = timer.elapsed();
qDebug() << end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    QFuture<void> qf = QtConcurrent::map(arr.begin(), arr.end(), [](double &x){ ++x; });
    qf.waitForFinished();
}
end = timer.elapsed();
qDebug() << end;

但是程序输出
4 Threads
905 // std::transform
886 // std::for_each
876 // QtConcurrent::map

因此,多线程版本几乎没有速度优势。我验证了实际上有4个线程在运行。我使用-O2优化。更常见的QThreadPool方法是否更适合这种情况?

编辑:

我尝试了使用QtConcurrent::run()的不同方法。以下是程序代码的相关部分:
void add1(QVector<double>::iterator first, QVector<double>::iterator last) {
    for(; first != last; ++first) {
        *first += 1;
    }
}

/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
QFuture<void> qf[numThreads];
for(int j = 0; j < numThreads; ++j) {
    qf[j] = QtConcurrent::run(add1, arr.begin()+j*n/numThreads, arr.begin()+(j+1)*n/numThreads-1);
}
for(int j = 0; j < numThreads; ++j) {
    qf[j].waitForFinished();
}

因此,我将任务手动分配到不同的线程上。但是仍然很难提高性能:
181 ms // std::for_each
163 ms // QtConcurrent::run

这里还有什么问题?

最佳答案

与仅使用C++线程原语和roll-your-own-thread-pools相比,QtConcurrent似乎具有较高的开销。

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};
struct thread_pool {
  thread_pool( std::size_t n = 1 ) { start_thread(n); }
  thread_pool( thread_pool&& ) = delete;
  thread_pool& operator=( thread_pool&& ) = delete;
  ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R()> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> run_task( F task ) {
    if (threads_active() >= total_threads()) {
      start_thread();
    }
    return queue_task( std::move(task) );
  }
  void terminate() {
    tasks.terminate();
  }
  std::size_t threads_active() const {
    return active;
  }
  std::size_t total_threads() const {
    return threads.size();
  }
  void clear_threads() {
    terminate();
    threads.clear();
  }
  void start_thread( std::size_t n = 1 ) {
    while(n-->0) {
      threads.push_back(
        std::async( std::launch::async,
          [this]{
            while(auto task = tasks.pop_front()) {
              ++active;
              try{
                (*task)();
              } catch(...) {
                --active;
                throw;
              }
              --active;
            }
          }
        )
      );
    }
  }
private:
  std::vector<std::future<void>> threads;
  threaded_queue<std::packaged_task<void()>> tasks;
  std::atomic<std::size_t> active = {};
};

struct my_timer_t {
    std::chrono::high_resolution_clock::time_point first;
    std::chrono::high_resolution_clock::duration duration;

    void start() {
        first = std::chrono::high_resolution_clock::now();
    }
    std::chrono::high_resolution_clock::duration finish() {
        return duration = std::chrono::high_resolution_clock::now()-first;
    }
    unsigned long long ms() const {
        return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
    }
};
int main() {
    std::vector<double> arr(1000000, 0);
    my_timer_t timer;

    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; });
    }
    timer.finish();
    auto time_transform = timer.ms();
    std::cout << time_transform << "<- std::transform (" << arr[rand()%arr.size()] << ")\n";
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
    }
    timer.finish();
    auto time_for_each = timer.ms();
    std::cout << time_for_each << "<- std::for_each (" << arr[rand()%arr.size()] << ")\n";
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    enum { num_threads = 8 };
    thread_pool pool(num_threads);
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::array< std::future<void>, num_threads > tasks;
        for (int t = 0; t < num_threads; ++t) {
            tasks[t] = pool.run_task([&,t]{
                std::for_each( arr.begin()+(arr.size()/num_threads)*t, arr.begin()+(arr.size()/num_threads)*(t+1), [](double& x){++x;} );
            });
        }
        // std::cout << "loop! -- " << pool.threads_active() << "/" << pool.total_threads() << std::endl;
        for (int t = 0; t < num_threads; ++t)
            tasks[t].wait();
    }
    timer.finish();
    auto time_pool = timer.ms();
    std::cout << time_pool << "<- thread_pool (" << arr[rand()%arr.size()] << ")\n";
}

Live example

这将产生:
153<- std::transform (100)
131<- std::for_each (200)
82<- thread_pool (300)

使用简单的C++ 11线程池以8种方式拆分任务时,可显着提高速度。 (将任务拆分为4种方式时大约为105)。

现在,我的确使用了比您小的十倍的测试集,因为当我的程序花了这么长时间运行时,在线系统超时了。

与您的线程池系统进行通信会产生开销,但是我的幼稚线程池不应表现出像这样的真实库。

现在,一个严重的问题是您可能受到内存IO的约束;如果所有人都必须等待字节,则更快地访问更多字节的线程将无济于事。

10-07 17:31