我只是在学习如何在c++ 11中使用std::thread。基本上,我使用的硬件中有一长串数据(想象一个在0-15000之间的for循环)和1568个线程。我想要一个单独的线程来处理每个样本。我了解如何创建第一个1568线程,它工作正常。但是一旦到达N_thread + 1示例,我便要检查是否有可用线程。如果存在,则将该数据样本发送到该线程。每个线程都发送到互斥锁功能,该功能最后会解锁。也许我误解了线程是如何工作的,不能以这种方式做事?也许有更好的线程/CPU分配库可以提供帮助?

就像我说的,我可以说创建了1568个线程,然后运行和联接,最终结果很好。只需要更多信息。

这是我的主要

int main(){
  cout<<"In main"<<endl;
  CSVReaderUpdatedStructure reader("data.csv");
  vector<STMDataPacket> DataList = reader.GetData();

  thread_pool Pool(THREAD_COUNT);
  auto startT0 = chrono::high_resolution_clock::now();
   for(unsigned s=0; s<DataList.size()-1; s++){
      cout<<"analysing sample "<<s<<endl;
      auto done = Pool.add_task([s= s, Sample= DataList[s], t_inf = time_info,wf=writefile, f=factor]{GetDMWPulses(s, Sample, t_inf, wf,f);});
      done.wait();

    }

  auto stop = chrono::high_resolution_clock::now();
  cout<<"pulses "<<pulses.size()<<endl;
  auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0);
  cout <<"time for MWD full process = "<< duration.count() <<" microseconds "<< endl;

  return 0;

}

最佳答案

您可能不想要1568个线程。您可能需要1568多个任务。

您可能想要一个线程池。 TBB有一个线程池,并且几乎在所有平台上都可用。

编写自己的线程池并不难。这是一个草图:

template<class T>
struct threadsafe_queue {
  optional<T> pop() {
    auto l = lock();
    cv.wait( l, [&]{
      return abort || !data.empty();
    });
    if (abort) return {};
    T retval = std::move(data.front());
    data.pop();
    return retval;
  }
  void push( T in ) {
    auto l = lock();
    data.push( std::move(in) );
    cv.notify_one();
  }
  void abort_queue() {
    auto l = lock();
    abort = true;
    cv.notify_all();
  }
private:
  mutable std::mutex m;
  std::condition_variable cv;
  std::queue<T> data;
  bool abort = false;

  std::unique_lock<std::mutex> lock() const {
    return std::unique_lock<std::mutex>(m);
  }
};

struct thread_pool {
  template<class F, class R=typename std::decay< typename std::result_of< F&() >::type>::type>
  auto add_task( F&& f )
  -> std::future< R >
  {
     std::packaged_task<R()> task( std::forward<F>(f) );
     auto retval = task.get_future();
     tasks.push( std::packaged_task<void()>(std::move(task)) );
     return retval;
  }

  void start_thread( std::size_t N=1 )
  {
    if (shutdown) return;
    for (std::size_t i = 0; i < N; ++i)
    {
      threads.emplace_back( [this]{
        while (true)
        {
          if(shutdown) return;
          auto task = tasks.pop();
          if (!task)
            return;
          (*task)();
        }
      } );
    }
  }
  void cleanup() {
    shutdown = true;
    tasks.abort_queue();
    for (auto&& t:threads)
      t.join();
    threads.clear();
  }
  ~thread_pool() {
    cleanup();
  }

  thread_pool():thread_pool( std::thread::hardware_concurrency() ) {}
  explicit thread_pool( std::size_t N ) {
    start_thread(N);
  }
private:
  threadsafe_queue<std::packaged_task<void()>> tasks;
  std::vector<std::thread> threads;
  std::atomic<bool> shutdown = false;
};

现在创建一个thread_pool

将任务插入其中。拿出 future 。

让工作人员任务增加std::atomic<unsigned int>并等待其达到最大值,或者做一些更奇特的事情。
struct counting_barrier {
  explicit counting_barrier( std::size_t n ):count(n) {}
  void operator--() {
    --count;
    if (count <= 0)
    {
       std::unique_lock<std::mutex> l(m);
       cv.notify_all();
    }
  }
  void wait() {
    std::unique_lock<std::mutex> l(m);
    cv.wait( l, [&]{ return count <= 0; } );
  }
private:
  std::mutex m;
  std::condition_variable cv;
  std::atomic<std::ptrdiff_t> count = 0;
};

创建一个counting_barrier barrier( 15000 )或其他。完成后的线程可以--barrier(它是线程安全的)。主线程可以是barrier.wait(),当调用15000 --时,它将被唤醒。

上面的代码可能有错别字,但设计合理。为了提高工业实力,您还希望有一个更好的关机程序。

Live example

如果没有可选或增强可选,请使用以下命令:
template<class T>
struct optional {
  T* get() { return static_cast<T*>( static_cast<void*>( & data ) ); };
  T const* get() const { return static_cast<T*>( static_cast<void*>( & data ) ); };

  T& operator*() & { return *get(); }
  T&& operator*() && { return std::move(*get()); }
  T const& operator*() const & { return *get(); }
  T const&& operator*() const&& { return std::move(*get()); }

  explicit operator bool() const { return engaged; }
  bool has_value() const { return (bool)*this; }
  template< class U >
  T value_or( U&& default_value ) const& {
    if (*this) return **this;
    return std::forward<U>(default_value);
  }
  template< class U >
  T value_or( U&& default_value ) && {
    if (*this) return std::move(**this);
    return std::forward<U>(default_value);
  }

  optional(T const& t) {
    emplace(t);
  }
  optional(T&& t) {
    emplace(std::move(t));
  }
  optional() = default;
  optional(optional const& o) {
    if (o) {
      emplace( *o );
    }
  }
  optional(optional && o) {
    if (o) {
      emplace( std::move(*o) );
    }
  }
  optional& operator=(optional const& o) & {
    if (!o) {
      reset();
    } else if (*this) {
      **this = *o;
    } else {
      emplace( *o );
    }
    return *this;
  }
  optional& operator=(optional && o) & {
    if (!o) {
      reset();
    } else if (*this) {
      **this = std::move(*o);
    } else {
      emplace( std::move(*o) );
    }
    return *this;
  }
  template<class...Args>
  T& emplace(Args&&...args) {
    if (*this) reset();
    ::new( static_cast<void*>(&data) ) T(std::forward<Args>(args)...);
    engaged = true;
    return **this;
  }
  void reset() {
    if (*this) {
      get()->~T();
      engaged = false;
    }
  }
  ~optional() { reset(); }
private:
  using storage = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
  bool engaged = false;
  storage data;
};

请注意,此 optional 不是工业实力;我从字面上写它,并且没有测试它。它缺少真正的选装件具有的许多工业强度功能。但是您可以将一个真正的可选替换为它的位置,并获得几乎相同或更好的行为,因此,如果您缺少一个,则可以使用它。
counting_barrier barrier(100);
thread_pool p(10);
for (int i = 0; i < 100; ++i)
{
  p.add_task([&barrier,i]{
    std::stringstream ss;
    ss << i << ",";
    std::cout << ss.str();
    --barrier;
  });
}
barrier.wait();
std::cout << "\n";
auto done1 = p.add_task([]{ std::cout << "hello" << std::endl; });
done1.wait();
auto done2 = p.add_task([]{ std::cout << "world" << std::endl; });
done2.wait();

10-04 14:57