当前,我正在使用OpenMP和C编程k-means ++的并行版本。到目前为止,我正在实现质心的初始化。如果您不熟悉此过程,则该过程大致类似于follows。给定带有dataset
点的n
(矩阵),就可以使用“概率函数”(也称为轮盘赌选择)来初始化k
形心。
假设您有n=4
点和以下距某些质心的距离数组:
distances = [2, 4, 6, 8]
dist_sum = 20
根据这些,通过将
distances
的每个条目除以dist_sum
并添加先前的结果来定义一个累积概率数组,如下所示:probs = [0.1, 0.2, 0.3, 0.4] = [2/20, 4/20, 6/20, 8/20]
acc_probs = [0.1, 0.3, 0.6, 1.0]
然后,执行轮盘选择。给定一个随机数,例如
r=0.5
,使用r
和acc_probs
选择下一个点,遍历acc_probs
直到r < acc_probs[i]
。在此示例中,所选点是i=2
,因为r < acc_probs[2]
。问题
在这种情况下,我正在处理非常大的矩阵(大约为点的
n=16 000 000
)。尽管该程序给出了正确的答案(即质心的良好初始化),但它的缩放比例并没有达到预期的效果。该函数根据该算法计算初始质心。 double **parallel_init_centroids (double **dataset, int n, int d, int k, RngStream randomizer, long int *total_ops) {
double dist=0, error=0, dist_sum=0, r=0, partial_sum=0, mindist=0;
int cn=0, cd=0, ck = 0, cck = 0, idx = 0;
ck = 0;
double probs_sum = 0; // debug
int mink=0, id=0, cp=0;
for (ck = 0; ck < k; ck++) {
if ( ck == 0 ) {
// 1. choose an initial centroid c_0 from dataset randomly
idx = RngStream_RandInt (randomizer, 0, n-1);
}
else {
// 2. choose a successive centroid c_{ck} using roulette selection
r = RngStream_RandU01 (randomizer);
idx = 0;
partial_sum = 0;
for (cn=0; cn<n; cn++) {
partial_sum = partial_sum + distances[cn]/dist_sum;
if (r < partial_sum) {
idx = cn;
break;
}
}
}
// 3. copy centroid from dataset
for (cd=0; cd<d; cd++)
centroids[ck][cd] = dataset[idx][cd];
// reset before parallel region
dist_sum = 0;
// -- parallel region --
# pragma omp parallel shared(distances, clusters, centroids, dataset, chunk, dist_sum_threads, total_ops_threads) private(id, cn, cck, cd, cp, error, dist, mindist, mink)
{
id = omp_get_thread_num();
dist_sum_threads[id] = 0; // each thread reset its entry
// parallel loop
// 4. recompute distances against centroids
# pragma omp for schedule(static,chunk)
for (cn=0; cn<n; cn++) {
mindist = DMAX;
mink = 0;
for (cck=0; cck<=ck; cck++) {
dist = 0;
for (cd=0; cd<d; cd++) {
error = dataset[cn][cd] - centroids[ck][cd];
dist = dist + (error * error); total_ops_threads[id]++;
}
if (dist < mindist) {
mindist = dist;
mink = ck;
}
}
distances[cn] = mindist;
clusters[cn] = mink;
dist_sum_threads[id] += mindist; // each thread contributes before reduction
}
}
// -- parallel region --
// 5. sequential reduction
dist_sum = 0;
for (cp=0; cp<p; cp++)
dist_sum += dist_sum_threads[cp];
}
// stats
*(total_ops) = 0;
for (cp=0; cp<p; cp++)
*(total_ops) += total_ops_threads[cp];
// free it later
return centroids;
}
如您所见,并行区域计算
n
d
-维度点与k
d
-维度质心之间的距离。这项工作在p
线程之间共享(最多32个)。并行区域完成后,将填充两个数组:distances
和dist_sum_threads
。第一个数组与前面的示例相同,而第二个数组包含每个线程收集的累积距离。考虑前面的示例,如果p=2
线程可用,则此数组的定义如下:dist_sum_threads[0] = 6 ([2, 4]) # filled by thread 0
dist_sum_threads[1] = 14 ([6, 8]) # filled by thread 1
dist_sum
是通过添加dist_sum_threads
的每个条目来定义的。该功能按预期工作,但是当线程数增加时,执行时间会增加。此figure显示了一些性能指标。我的实现有什么问题,尤其是openmp? 概括而言,仅使用了两种编译指示:
# pragma omp parallel ...
{
get thread id
# pragma omp for schedule(static,chunk)
{
compute distances ...
}
fill distances and dist_sum_threads[id]
}
换句话说,我消除了障碍,互斥访问以及其他可能导致额外开销的问题。但是,随着线程数量的增加,执行时间最糟糕。
更新
n=100000
点和k=16
重心之间的距离。 omp_get_wtime
测量执行时间。总时间存储在wtime_spent
中。 dist_sum
。但是,它不能按预期方式工作(下面注释为并行还原差)。 dist_sum
的正确值是999857108020.0
,但是,当使用p
线程计算它时,结果是999857108020.0 * p
,这是错误的。 double **parallel_compute_distances (double **dataset, int n, int d, int k, long int *total_ops) {
double dist=0, error=0, mindist=0;
int cn, cd, ck, mink, id, cp;
// reset before parallel region
dist_sum = 0;
// -- start time --
wtime_start = omp_get_wtime ();
// parallel loop
# pragma omp parallel shared(distances, clusters, centroids, dataset, chunk, dist_sum, dist_sum_threads) private(id, cn, ck, cd, cp, error, dist, mindist, mink)
{
id = omp_get_thread_num();
dist_sum_threads[id] = 0; // reset
// 2. recompute distances against centroids
# pragma omp for schedule(static,chunk)
for (cn=0; cn<n; cn++) {
mindist = DMAX;
mink = 0;
for (ck=0; ck<k; ck++) {
dist = 0;
for (cd=0; cd<d; cd++) {
error = dataset[cn][cd] - centroids[ck][cd];
dist = dist + (error * error); total_ops_threads[id]++;
}
if (dist < mindist) {
mindist = dist;
mink = ck;
}
}
distances[cn] = mindist;
clusters[cn] = mink;
dist_sum_threads[id] += mindist;
}
// bad parallel reduction
//#pragma omp parallel for reduction(+:dist_sum)
//for (cp=0; cp<p; cp++){
// dist_sum += dist_sum_threads[cp];
//}
}
// -- end time --
wtime_end = omp_get_wtime ();
// -- total wall time --
wtime_spent = wtime_end - wtime_start;
// sequential reduction
for (cp=0; cp<p; cp++)
dist_sum += dist_sum_threads[cp];
// stats
*(total_ops) = 0;
for (cp=0; cp<p; cp++)
*(total_ops) += total_ops_threads[cp];
return centroids;
}
最佳答案
您的代码不是mcve,我只能在这里发出假设。但是,这是我认为(可能)发生的事情(没有特定的重要性顺序):
dist_sum_threads
和total_ops_threads
时,您会遭受错误共享的困扰。您可以通过简单地声明reduction( +: dist_sum )
并直接在dist_sum
区域内使用parallel
来完全避免使用前者。您也可以使用声明为total_ops_threads
的本地total_ops
对reduction(+)
进行同样的处理,最后将其累积到*total_ops
中。 (顺便说一句,dist_sum
已计算,但从未使用过...)numactl
和/或与proc_bind
的线程相似性)。您还可以尝试使用线程调度策略和/或尝试查看是否无法将某些循环平铺应用于阻止数据进入缓存的问题。 omp_get_wtime()
进行任何此类测量。 尝试解决这些问题,并根据您的内存架构评估实际的潜在加速。如果您仍然觉得自己没有达到应有的水平,请更新您的问题。
编辑:
由于您提供了完整的示例,因此我设法对您的代码进行了一些试验并实现了我所想到的修改(以最大程度地减少错误共享)。
以下是该函数的外观:
double **parallel_compute_distances( double **dataset, int n, int d,
int k, long int *total_ops ) {
// reset before parallel region
dist_sum = 0;
// -- start time --
wtime_start = omp_get_wtime ();
long int tot_ops = 0;
// parallel loop
# pragma omp parallel for reduction( +: dist_sum, tot_ops )
for ( int cn = 0; cn < n; cn++ ) {
double mindist = DMAX;
int mink = 0;
for ( int ck = 0; ck < k; ck++ ) {
double dist = 0;
for ( int cd = 0; cd < d; cd++ ) {
double error = dataset[cn][cd] - centroids[ck][cd];
dist += error * error;
tot_ops++;
}
if ( dist < mindist ) {
mindist = dist;
mink = ck;
}
}
distances[cn] = mindist;
clusters[cn] = mink;
dist_sum += mindist;
}
// -- end time --
wtime_end = omp_get_wtime ();
// -- total wall time --
wtime_spent = wtime_end - wtime_start;
// stats
*(total_ops) = tot_ops;
return centroids;
}
因此,一些评论:
dist_sum
和一个用于操作总数的局部变量(tot_ops
)声明为reduction(+:)
。这避免了每个索引只有一个线程访问同一数组,这会触发false sharing(这几乎是触发它的完美案例)。我使用了局部变量而不是total_ops
,因为后来它是一个指针,它不能直接在reduction
子句中使用。但是,最后使用tot_ops
更新它就可以了。 private
声明,这些声明通常是OpenMP程序员的主要陷阱。现在,您只需要考虑两个reduction
变量和两个数组,它们显然是shared
,因此不需要任何额外的声明。这大大简化了parallel
指令,并有助于将重点放在parallel
和for
指令以提高可读性(可能还有性能)。 schedule
子句,以使编译器和/或运行时库使用其默认值。如果我有充分的理由,我只会选择不同的调度策略。 这样一来,在我的双核笔记本电脑上使用GCC 5.3.0并使用
-std=c99 -O3 -fopenmp -mtune=native -march=native
进行编译时,我在各种线程数下都获得了一致的结果,并且两个线程的速度提高了2倍。在使用Intel编译器和
-std=c99 -O3 -xhost -qopenmp
的10核计算机上,我得到了从1到10个线程的线性加速...即使在Xeon Phi KNC上,我也可以从1到60个线程获得近乎线性的加速(然后使用更多的硬件线程仍然可以实现一定的加速,但幅度不尽相同)。
观察到的加速使我意识到,与我的假设不同,代码不是受内存限制的,因为您访问的数组实际上得到了很好的缓存。这样做的原因是,您仅访问第二维很小(40和16)的
dataset[cn][cd]
和centroids[ck][cd]
,因此非常适合高速缓存,而可以有效地预取要加载的dataset
块,以供下一个cn
索引使用。您仍然在此版本的代码中遇到可伸缩性问题吗?