我有以下代码
#include "stdio.h"
#include "stdlib.h"
#include <string.h>
#define MAXBINS 8
void swap_long(unsigned long int **x, unsigned long int **y){
unsigned long int *tmp;
tmp = x[0];
x[0] = y[0];
y[0] = tmp;
}
void swap(unsigned int **x, unsigned int **y){
unsigned int *tmp;
tmp = x[0];
x[0] = y[0];
y[0] = tmp;
}
void truncated_radix_sort(unsigned long int *morton_codes,
unsigned long int *sorted_morton_codes,
unsigned int *permutation_vector,
unsigned int *index,
int *level_record,
int N,
int population_threshold,
int sft, int lv){
int BinSizes[MAXBINS] = {0};
unsigned int *tmp_ptr;
unsigned long int *tmp_code;
level_record[0] = lv; // record the level of the node
if(N<=population_threshold || sft < 0) { // Base case. The node is a leaf
memcpy(permutation_vector, index, N*sizeof(unsigned int)); // Copy the pernutation vector
memcpy(sorted_morton_codes, morton_codes, N*sizeof(unsigned long int)); // Copy the Morton codes
return;
}
else{
// Find which child each point belongs to
int j = 0;
for(j=0; j<N; j++){
unsigned int ii = (morton_codes[j]>>sft) & 0x07;
BinSizes[ii]++;
}
// scan prefix
int offset = 0, i = 0;
for(i=0; i<MAXBINS; i++){
int ss = BinSizes[i];
BinSizes[i] = offset;
offset += ss;
}
for(j=0; j<N; j++){
unsigned int ii = (morton_codes[j]>>sft) & 0x07;
permutation_vector[BinSizes[ii]] = index[j];
sorted_morton_codes[BinSizes[ii]] = morton_codes[j];
BinSizes[ii]++;
}
//swap the index pointers
swap(&index, &permutation_vector);
//swap the code pointers
swap_long(&morton_codes, &sorted_morton_codes);
/* Call the function recursively to split the lower levels */
offset = 0;
for(i=0; i<MAXBINS; i++){
int size = BinSizes[i] - offset;
truncated_radix_sort(&morton_codes[offset],
&sorted_morton_codes[offset],
&permutation_vector[offset],
&index[offset], &level_record[offset],
size,
population_threshold,
sft-3, lv+1);
offset += size;
}
}
}
我试图做这个块
int j = 0;
for(j=0; j<N; j++){
unsigned int ii = (morton_codes[j]>>sft) & 0x07;
BinSizes[ii]++;
}
通过用以下内容代替
int rc,j;
pthread_t *thread = (pthread_t *)malloc(NTHREADS*sizeof(pthread_t));
belong *belongs = (belong *)malloc(NTHREADS*sizeof(belong));
pthread_mutex_init(&bin_mtx, NULL);
for (j = 0; j < NTHREADS; j++){
belongs[j].n = NTHREADS;
belongs[j].N = N;
belongs[j].tid = j;
belongs[j].sft = sft;
belongs[j].BinSizes = BinSizes;
belongs[j].mcodes = morton_codes;
rc = pthread_create(&thread[j], NULL, belong_wrapper, (void *)&belongs[j]);
}
for (j = 0; j < NTHREADS; j++){
rc = pthread_join(thread[j], NULL);
}
并在递归函数之外定义这些
typedef struct{
int n, N, tid, sft;
int *BinSizes;
unsigned long int *mcodes;
}belong;
pthread_mutex_t bin_mtx;
void * belong_wrapper(void *arg){
int n, N, tid, sft, j;
int *BinSizes;
unsigned int ii;
unsigned long int *mcodes;
n = ((belong *)arg)->n;
N = ((belong *)arg)->N;
tid = ((belong *)arg)->tid;
sft = ((belong *)arg)->sft;
BinSizes = ((belong *)arg)->BinSizes;
mcodes = ((belong *)arg)->mcodes;
for (j = tid; j<N; j+=n){
ii = (mcodes[j] >> sft) & 0x07;
pthread_mutex_lock(&bin_mtx);
BinSizes[ii]++;
pthread_mutex_unlock(&bin_mtx);
}
}
但是,执行所需的时间比串行的要多得多。为什么会这样?我应该改变什么?
最佳答案
由于您使用单个互斥锁来保护对BinSizes
数组的更新,因此最终您仍将依次对该数组进行所有更新:在任何给定时间,只有一个线程可以调用BinSizes[ii]++
。基本上,您仍然按顺序执行函数,但是会产生创建和销毁线程的额外开销。
我可以为您想到几种选择(可能还有更多选择):
按照@Chris的建议进行操作,并使每个线程更新其中一部分BinSizes
。根据以下属性,这可能不可行
您用于计算ii
的计算。
创建多个互斥锁代表不同的分区BinSizes
。例如,如果BinSizes
具有10个元素,则可以
为元素0-4创建一个互斥量,为元素5-9创建另一个互斥量,
然后在您的线程中使用它们,如下所示:
if (ii < 5) {
mtx_index = 0;
} else {
mtx_index = 1;
}
pthread_mutex_lock(&bin_mtx[mtx_index]);
BinSizes[ii]++;
pthread_mutex_unlock(&bin_mtx[mtx_index]);
您可以将此思想推广到任何大小的BinSizes和任何范围:
每个数组元素可能都有不同的互斥量。当然
那么您就要承担创建这些互斥对象的开销,并且
如果有人试图一次锁定其中的几个,则可能导致死锁等。
最后,您可以完全放弃并行处理此块的想法:正如其他用户所提到的那样,使用线程会受到一定程度的收益递减的影响。除非您的
BinSizes
数组很大,否则即使您“正确执行”,也可能看不到并行化的巨大好处。关于c - 在C中使用pthread的递归函数,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26765215/