这个问题是关于在C++中实现以下模拟的最佳策略。
我正在尝试将模拟作为物理研究项目的一部分,该项目基本上跟踪空间中节点链的动力学。每个节点都包含一个位置以及某些参数(局部曲率,速度,到邻居的距离等),这些参数都会随着时间的推移而变化。
每个时间步骤都可以细分为以下四个部分:
(此外,节点可能会卡住,从而使它们在其余的模拟过程中处于不 Activity 状态。具有不 Activity 邻居的不 Activity 节点的局部参数将不会更改,并且不需要任何其他计算。)
每个节点包含〜60个字节,我在链中有〜100 000个节点,我需要在链中发展约〜1000000个时间步长。但是,我想最大化这些数字,因为这样可以提高模拟的准确性,但是要限制在合理的时间(〜小时)内完成模拟。 (约30%的节点将处于非 Activity 状态。)
我已经开始将这种模拟实现为C++中的双向链表。这似乎很自然,因为我需要在现有节点之间插入新节点,并且因为局部参数取决于最近的邻居。 (每次循环遍历整个链时,我都会向下一个 Activity 节点添加一个额外的指针,以避免不必要的计算)。
关于并行化(或与此相关的编码),我不是专家,但是我玩过OpenMP,我非常喜欢如何用两行代码来加快独立操作的循环。我不知道如何使我的链表并行运行,甚至不行(?)。所以我有使用STL vector的想法。我可以存储邻居的索引并通过标准查找访问它们,而不用指向最近的邻居。我还可以按链的位置(每第x个时间步长)对 vector 进行排序,以获得更好的内存局部性。这种方法将允许循环使用OpenMP方法。
我有点被这个想法吓到了,因为我不必处理内存管理。而且我猜想STL vector 实现比我处理列表中的Node的简单“new”和“delete”方式更好。我知道我可以用STL列表做同样的事情,但是我不喜欢用迭代器访问最近的邻居的方式。
因此,我问1337 h4x0r和熟练的程序员,对于我的仿真,哪种设计更好?上面概述的 vector 方法是一个好主意吗?还是有一些技巧可以使链接列表与OpenMP一起使用?还是我应该考虑一种完全不同的方法?
该模拟将在具有8核和48G RAM的计算机上运行,因此我想我可以用大量内存换取速度。
提前致谢
编辑:
我需要在每个步骤中添加1-2%的新节点,因此除非将每个步骤中的 vector 排序,否则将它们存储为没有索引到最近邻居的 vector 将不起作用。
最佳答案
这是一个经典的权衡问题。使用数组或std::vector将使计算更快,插入更慢;使用双向链表或std::list可以使插入速度更快,而计算速度则更慢。
判断权衡问题的唯一方法是凭经验。哪个将对您的特定应用程序更快地工作?您真正能做的就是双向尝试并查看。计算越密集,模板越短(例如,计算强度-每要带入的内存量您必须进行多少次触发器),标准阵列就越不重要。但是基本上,您应该同时用两种方法模拟基本计算的实现,然后看它是否重要。我用std::vector和std::list一起破解了一个非常粗糙的东西。它可能以多种方式都是错误的,但是您可以试一下其中的一些参数,看看哪个对您有利。在我的系统上,对于给定的大小和计算量,list更快,但是可以很容易地进行两种选择。
W / rt openmp,是的,如果那是您要采取的方式,那么您的双手会被束缚;您几乎肯定会使用 vector 结构,但是首先您应该确保插入的额外成本不会使多核受益。
#include <iostream>
#include <list>
#include <vector>
#include <cmath>
#include <sys/time.h>
using namespace std;
struct node {
bool stuck;
double x[2];
double loccurve;
double disttoprev;
};
void tick(struct timeval *t) {
gettimeofday(t, NULL);
}
/* returns time in seconds from now to time described by t */
double tock(struct timeval *t) {
struct timeval now;
gettimeofday(&now, NULL);
return (double)(now.tv_sec - t->tv_sec) +
((double)(now.tv_usec - t->tv_usec)/1000000.);
}
int main()
{
const int nstart = 100;
const int niters = 100;
const int nevery = 30;
const bool doPrint = false;
list<struct node> nodelist;
vector<struct node> nodevect;
// Note - vector is *much* faster if you know ahead of time
// maximum size of vector
nodevect.reserve(nstart*30);
// Initialize
for (int i = 0; i < nstart; i++) {
struct node *mynode = new struct node;
mynode->stuck = false;
mynode->x[0] = i; mynode->x[1] = 2.*i;
mynode->loccurve = -1;
mynode->disttoprev = -1;
nodelist.push_back( *mynode );
nodevect.push_back( *mynode );
}
const double EPSILON = 1.e-6;
struct timeval listclock;
double listtime;
tick(&listclock);
for (int i=0; i<niters; i++) {
// Calculate local curvature, distance
list<struct node>::iterator prev, next, cur;
double dx1, dx2, dy1, dy2;
next = cur = prev = nodelist.begin();
cur++;
next++; next++;
dx1 = prev->x[0]-cur->x[0];
dy1 = prev->x[1]-cur->x[1];
while (next != nodelist.end()) {
dx2 = cur->x[0]-next->x[0];
dy2 = cur->x[1]-next->x[1];
double slope1 = (dy1/(dx1+EPSILON));
double slope2 = (dy2/(dx2+EPSILON));
cur->disttoprev = sqrt(dx1*dx1 + dx2*dx2 );
cur->loccurve = ( slope1*slope2*(dy1+dy2) +
slope2*(prev->x[0]+cur->x[0]) -
slope1*(cur->x[0] +next->x[0]) ) /
(2.*(slope2-slope1) + EPSILON);
next++;
cur++;
prev++;
}
// Insert interpolated pt every neveryth pt
int count = 1;
next = cur = nodelist.begin();
next++;
while (next != nodelist.end()) {
if (count % nevery == 0) {
struct node *mynode = new struct node;
mynode->x[0] = (cur->x[0]+next->x[0])/2.;
mynode->x[1] = (cur->x[1]+next->x[1])/2.;
mynode->stuck = false;
mynode->loccurve = -1;
mynode->disttoprev = -1;
nodelist.insert(next,*mynode);
}
next++;
cur++;
count++;
}
}
51,0-1 40%
struct timeval vectclock;
double vecttime;
tick(&vectclock);
for (int i=0; i<niters; i++) {
int nelem = nodevect.size();
double dx1, dy1, dx2, dy2;
dx1 = nodevect[0].x[0]-nodevect[1].x[0];
dy1 = nodevect[0].x[1]-nodevect[1].x[1];
for (int elem=1; elem<nelem-1; elem++) {
dx2 = nodevect[elem].x[0]-nodevect[elem+1].x[0];
dy2 = nodevect[elem].x[1]-nodevect[elem+1].x[1];
double slope1 = (dy1/(dx1+EPSILON));
double slope2 = (dy2/(dx2+EPSILON));
nodevect[elem].disttoprev = sqrt(dx1*dx1 + dx2*dx2 );
nodevect[elem].loccurve = ( slope1*slope2*(dy1+dy2) +
slope2*(nodevect[elem-1].x[0] +
nodevect[elem].x[0]) -
slope1*(nodevect[elem].x[0] +
nodevect[elem+1].x[0]) ) /
(2.*(slope2-slope1) + EPSILON);
}
// Insert interpolated pt every neveryth pt
int count = 1;
vector<struct node>::iterator next, cur;
next = cur = nodevect.begin();
next++;
while (next != nodevect.end()) {
if (count % nevery == 0) {
struct node *mynode = new struct node;
mynode->x[0] = (cur->x[0]+next->x[0])/2.;
mynode->x[1] = (cur->x[1]+next->x[1])/2.;
mynode->stuck = false;
mynode->loccurve = -1;
mynode->disttoprev = -1;
nodevect.insert(next,*mynode);
}
next++;
cur++;
count++;
}
}
vecttime = tock(&vectclock);
cout << "Time for list: " << listtime << endl;
cout << "Time for vect: " << vecttime << endl;
vector<struct node>::iterator v;
list <struct node>::iterator l;
if (doPrint) {
cout << "Vector: " << endl;
for (v=nodevect.begin(); v!=nodevect.end(); ++v) {
cout << "[ (" << v->x[0] << "," << v->x[1] << "), " << v->disttoprev << ", " << v->loccurve << "] " << endl;
}
cout << endl << "List: " << endl;
for (l=nodelist.begin(); l!=nodelist.end(); ++l) {
cout << "[ (" << l->x[0] << "," << l->x[1] << "), " << l->disttoprev << ", " << l->loccurve << "] " << endl;
}
}
cout << "List size is " << nodelist.size() << endl;
}