我有以下代码

    #include "stdio.h"
#include "stdlib.h"
#include <string.h>

#define MAXBINS 8


void swap_long(unsigned long int **x, unsigned long int **y){

  unsigned long int *tmp;
  tmp = x[0];
  x[0] = y[0];
  y[0] = tmp;

}

void swap(unsigned int **x, unsigned int **y){

  unsigned int *tmp;
  tmp = x[0];
  x[0] = y[0];
  y[0] = tmp;

}

void truncated_radix_sort(unsigned long int *morton_codes,
              unsigned long int *sorted_morton_codes,
              unsigned int *permutation_vector,
              unsigned int *index,
              int *level_record,
              int N,
              int population_threshold,
              int sft, int lv){

  int BinSizes[MAXBINS] = {0};
  unsigned int *tmp_ptr;
  unsigned long int *tmp_code;

  level_record[0] = lv; // record the level of the node

  if(N<=population_threshold || sft < 0) { // Base case. The node is a leaf
    memcpy(permutation_vector, index, N*sizeof(unsigned int)); // Copy the pernutation vector
    memcpy(sorted_morton_codes, morton_codes, N*sizeof(unsigned long int)); // Copy the Morton codes

    return;
  }
  else{

    // Find which child each point belongs to
    int j = 0;
    for(j=0; j<N; j++){
      unsigned int ii = (morton_codes[j]>>sft) & 0x07;
      BinSizes[ii]++;
    }


    // scan prefix
    int offset = 0, i = 0;
    for(i=0; i<MAXBINS; i++){
      int ss = BinSizes[i];
      BinSizes[i] = offset;
      offset += ss;
    }

    for(j=0; j<N; j++){
      unsigned int ii = (morton_codes[j]>>sft) & 0x07;
      permutation_vector[BinSizes[ii]] = index[j];
      sorted_morton_codes[BinSizes[ii]] = morton_codes[j];
      BinSizes[ii]++;
    }

    //swap the index pointers
    swap(&index, &permutation_vector);

    //swap the code pointers
    swap_long(&morton_codes, &sorted_morton_codes);

    /* Call the function recursively to split the lower levels */
    offset = 0;
    for(i=0; i<MAXBINS; i++){

      int size = BinSizes[i] - offset;

      truncated_radix_sort(&morton_codes[offset],
               &sorted_morton_codes[offset],
               &permutation_vector[offset],
               &index[offset], &level_record[offset],
               size,
               population_threshold,
               sft-3, lv+1);
      offset += size;
    }


  }
}


我试图做这个块

int j = 0;
    for(j=0; j<N; j++){
      unsigned int ii = (morton_codes[j]>>sft) & 0x07;
      BinSizes[ii]++;
    }


通过用以下内容代替

    int rc,j;
    pthread_t *thread = (pthread_t *)malloc(NTHREADS*sizeof(pthread_t));
    belong *belongs = (belong *)malloc(NTHREADS*sizeof(belong));
    pthread_mutex_init(&bin_mtx, NULL);
    for (j = 0; j < NTHREADS; j++){
        belongs[j].n = NTHREADS;
        belongs[j].N = N;
        belongs[j].tid = j;
        belongs[j].sft = sft;
        belongs[j].BinSizes = BinSizes;
        belongs[j].mcodes = morton_codes;
        rc = pthread_create(&thread[j], NULL, belong_wrapper, (void *)&belongs[j]);
    }

    for (j = 0; j < NTHREADS; j++){
        rc = pthread_join(thread[j], NULL);
    }


并在递归函数之外定义这些

typedef struct{
    int n, N, tid, sft;
    int *BinSizes;
    unsigned long int *mcodes;
}belong;

pthread_mutex_t bin_mtx;

void * belong_wrapper(void *arg){
    int n, N, tid, sft, j;
    int *BinSizes;
    unsigned int ii;
    unsigned long int *mcodes;
    n = ((belong *)arg)->n;
    N = ((belong *)arg)->N;
    tid = ((belong *)arg)->tid;
    sft = ((belong *)arg)->sft;
    BinSizes = ((belong *)arg)->BinSizes;
    mcodes = ((belong *)arg)->mcodes;
    for (j = tid; j<N; j+=n){
        ii = (mcodes[j] >> sft) & 0x07;
        pthread_mutex_lock(&bin_mtx);
        BinSizes[ii]++;
        pthread_mutex_unlock(&bin_mtx);
    }

}


但是,执行所需的时间比串行的要多得多。为什么会这样?我应该改变什么?

最佳答案

由于您使用单个互斥锁来保护对BinSizes数组的更新,因此最终您仍将依次对该数组进行所有更新:在任何给定时间,只有一个线程可以调用BinSizes[ii]++。基本上,您仍然按顺序执行函数,但是会产生创建和销毁线程的额外开销。

我可以为您想到几种选择(可能还有更多选择):


按照@Chris的建议进行操作,并使每个线程更新其中一部分
BinSizes。根据以下属性,这可能不可行
您用于计算ii的计算。
创建多个互斥锁代表不同的分区
BinSizes。例如,如果BinSizes具有10个元素,则可以
为元素0-4创建一个互斥量,为元素5-9创建另一个互斥量,
然后在您的线程中使用它们,如下所示:

if (ii < 5) {
  mtx_index = 0;
} else {
  mtx_index = 1;
}
pthread_mutex_lock(&bin_mtx[mtx_index]);
BinSizes[ii]++;
pthread_mutex_unlock(&bin_mtx[mtx_index]);


您可以将此思想推广到任何大小的BinSizes和任何范围:
每个数组元素可能都有不同的互斥量。当然
那么您就要承担创建这些互斥对象的开销,并且
如果有人试图一次锁定其中的几个,则可能导致死锁等。
最后,您可以完全放弃并行处理此块的想法:正如其他用户所提到的那样,使用线程会受到一定程度的收益递减的影响。除非您的BinSizes数组很大,否则即使您“正确执行”,也可能看不到并行化的巨大好处。

关于c - 在C中使用pthread的递归函数,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26765215/

10-11 01:24