!!!家庭作业 !!!

请不要发布代码,因为我想完善自己,而如果可能的话,可以通过一般信息或指出思想错误或其他可能的有用和相关资源指出正确的方向。



我有一个创建我的npages * npages for use in my pagerank algorithm方形double矩阵帽的方法。

我已经用pthreads,SIMD以及pthreads和SIMD做到了。我使用了xcode工具时间分析器,发现仅pthreads版本是最快的,其次是仅SIMD版本,最慢的是同时具有SIMD和pthreads的版本。

由于它是家庭作业,因此可以在多台不同的机器上运行,但是我们给了标题#include,因此可以假设我们至少可以使用AVX。给定程序将使用多少线程作为程序的参数并将其存储在全局变量g_nthreads中。

在我的测试中,我一直在我的机器上进行测试,该机器是具有4个硬件内核和8个逻辑内核的IvyBridge,并且已经使用4个线程作为参数并使用8个线程作为参数对其进行了测试。



运行时间:

仅SIMD:

* 331毫秒-用于consturct_matrix_hat函数*

仅PTHREADS(8个线程):

70ms-每个线程同时进行

SIMD和PTHREADS(8个线程):

110ms-每个线程同时进行



在使用两种优化方式时,我正在做的是放慢速度呢?

我将发布每个实现:



所有版本都共享以下宏:

#define BIG_CHUNK    (g_n2/g_nthreads)
#define SMALL_CHUNK  (g_npages/g_nthreads)
#define MOD          BIG_CHUNK - (BIG_CHUNK % 4)
#define IDX(a, b)    ((a * g_npages) + b)


P线程:

// struct used for passing arguments
typedef struct {
    double* restrict m;
    double* restrict m_hat;
    int t_id;
    char padding[44];
} t_arg_matrix_hat;

// Construct matrix_hat with pthreads
static void* pthread_construct_matrix_hat(void* arg) {
    t_arg_matrix_hat* t_arg = (t_arg_matrix_hat*) arg;
    // set coordinate limits thread is able to act upon
    size_t start = t_arg->t_id * BIG_CHUNK;
    size_t end = t_arg->t_id + 1 != g_nthreads ? (t_arg->t_id + 1) * BIG_CHUNK : g_n2;

    // Initialise coordinates with given uniform value
    for (size_t i = start; i < end; i++) {
        t_arg->m_hat[i] = ((g_dampener * t_arg->m[i]) + HAT);
    }

    return NULL;
}

// Construct matrix_hat
double* construct_matrix_hat(double* matrix) {
    double* matrix_hat = malloc(sizeof(double) * g_n2);

    // create structs to send and retrieve matrix and value from threads
    t_arg_matrix_hat t_args[g_nthreads];
    for (size_t i = 0; i < g_nthreads; i++) {
        t_args[i] = (t_arg_matrix_hat) {
            .m = matrix,
            .m_hat = matrix_hat,
            .t_id = i
        };
    }
    // create threads and send structs with matrix and value to divide the matrix and
    // initialise the coordinates with the given value
    pthread_t threads[g_nthreads];
    for (size_t i = 0; i < g_nthreads; i++) {
        pthread_create(threads + i, NULL, pthread_construct_matrix_hat, t_args + i);
    }
    // join threads after all coordinates have been intialised
    for (size_t i = 0; i < g_nthreads; i++) {
        pthread_join(threads[i], NULL);
    }

    return matrix_hat;
}




SIMD:

// Construct matrix_hat
double* construct_matrix_hat(double* matrix) {
    double* matrix_hat = malloc(sizeof(double) * g_n2);

    double dampeners[4] = {g_dampener, g_dampener, g_dampener, g_dampener};
    __m256d b = _mm256_loadu_pd(dampeners);
    // Use simd to subtract values from each other
    for (size_t i = 0; i < g_mod; i += 4) {
        __m256d a = _mm256_loadu_pd(matrix + i);
        __m256d res = _mm256_mul_pd(a, b);
        _mm256_storeu_pd(&matrix_hat[i], res);
    }
    // Subtract values from each other that weren't included in simd
    for (size_t i = g_mod; i < g_n2; i++) {
        matrix_hat[i] = g_dampener * matrix[i];
    }
    double hats[4] = {HAT, HAT, HAT, HAT};
    b = _mm256_loadu_pd(hats);
    // Use simd to raise each value to the power 2
    for (size_t i = 0; i < g_mod; i += 4) {
        __m256d a = _mm256_loadu_pd(matrix_hat + i);
        __m256d res = _mm256_add_pd(a, b);
        _mm256_storeu_pd(&matrix_hat[i], res);
    }
    // Raise each value to the power 2 that wasn't included in simd
    for (size_t i = g_mod; i < g_n2; i++) {
        matrix_hat[i] += HAT;
    }

    return matrix_hat;
}




Pthreads和SIMD:

// struct used for passing arguments
typedef struct {
    double* restrict m;
    double* restrict m_hat;
    int t_id;
    char padding[44];
} t_arg_matrix_hat;

// Construct matrix_hat with pthreads
static void* pthread_construct_matrix_hat(void* arg) {
    t_arg_matrix_hat* t_arg = (t_arg_matrix_hat*) arg;
    // set coordinate limits thread is able to act upon
    size_t start = t_arg->t_id * BIG_CHUNK;
    size_t end = t_arg->t_id + 1 != g_nthreads ? (t_arg->t_id + 1) * BIG_CHUNK : g_n2;
    size_t leftovers = start + MOD;

    __m256d b1 = _mm256_loadu_pd(dampeners);
    //
    for (size_t i = start; i < leftovers; i += 4) {
        __m256d a1 = _mm256_loadu_pd(t_arg->m + i);
        __m256d r1 = _mm256_mul_pd(a1, b1);
        _mm256_storeu_pd(&t_arg->m_hat[i], r1);
    }
    //
    for (size_t i = leftovers; i < end; i++) {
        t_arg->m_hat[i] = dampeners[0] * t_arg->m[i];
    }

    __m256d b2 = _mm256_loadu_pd(hats);
    //
    for (size_t i = start; i < leftovers; i += 4) {
        __m256d a2 = _mm256_loadu_pd(t_arg->m_hat + i);
        __m256d r2 = _mm256_add_pd(a2, b2);
        _mm256_storeu_pd(&t_arg->m_hat[i], r2);
    }
    //
    for (size_t i = leftovers; i < end; i++) {
        t_arg->m_hat[i] += hats[0];
    }

    return NULL;
}

// Construct matrix_hat
double* construct_matrix_hat(double* matrix) {
    double* matrix_hat = malloc(sizeof(double) * g_n2);

    // create structs to send and retrieve matrix and value from threads
    t_arg_matrix_hat t_args[g_nthreads];
    for (size_t i = 0; i < g_nthreads; i++) {
        t_args[i] = (t_arg_matrix_hat) {
            .m = matrix,
            .m_hat = matrix_hat,
            .t_id = i
        };
    }
    // create threads and send structs with matrix and value to divide the matrix and
    // initialise the coordinates with the given value
    pthread_t threads[g_nthreads];
    for (size_t i = 0; i < g_nthreads; i++) {
        pthread_create(threads + i, NULL, pthread_construct_matrix_hat, t_args + i);
    }
    // join threads after all coordinates have been intialised
    for (size_t i = 0; i < g_nthreads; i++) {
        pthread_join(threads[i], NULL);
    }

    return matrix_hat;
}

最佳答案

我认为这是因为您的SIMD代码效率极低:在存储之前,它在内存上循环了两次,而不是通过乘法进行加法。您没有测试SIMD与标量基线,但是如果您有,您可能会发现SIMD代码也不是单线程加速。

如果您想自己解决其余的作业,请在这里停止阅读。

如果使用gcc -O3 -march=ivybridge,则pthread版本中的简单标量循环可能会自动矢量化为类似您应使用内在函数进行的操作。您甚至使用了restrict,因此它可能会意识到指针不能相互重叠,也不能与g_dampener重叠。

// this probably autovectorizes well.
// Initialise coordinates with given uniform value
for (size_t i = start; i < end; i++) {
    t_arg->m_hat[i] = ((g_dampener * t_arg->m[i]) + HAT);
}


// but this would be even safer to help the compiler's aliasing analysis:
double dampener = g_dampener;   // in case the compiler things one of the pointers might point at the global
double *restrict hat = t_arg->hat;
const double *restrict mat = t_arg->m;
... same loop but using these locals instead of


对于FP循环,这可能不是问题,因为double绝对不能为double *别名。



编码风格也很讨厌。您应该尽可能给__m256d变量赋予有意义的名称。

另外,您使用malloc,这不能保证matrix_hat将与32B边界对齐。 C11's aligned_alloc is probably the nicest wayposix_memalign(笨拙的界面),_mm_malloc(必须使用_mm_free,而不是free(3)释放)或其他选项。

double* construct_matrix_hat(const double* matrix) {
    // double* matrix_hat = malloc(sizeof(double) * g_n2);
    double* matrix_hat = aligned_alloc(64, sizeof(double) * g_n2);

    // double dampeners[4] = {g_dampener, g_dampener, g_dampener, g_dampener};  // This idiom is terrible, and might actually compile to code that stores it 4 times on the stack and then loads.
    __m256d vdamp = _mm256_set1_pd(g_dampener);    // will compile to a broadcast-load (vbroadcastsd)
    __m256d vhat  = _mm256_set1_pd(HAT);

    size_t last_full_vector = g_n2 & ~3ULL;    // don't load this from a global.
    // it's better for the compiler to see how it's calculated from g_n2

       // ??? Use simd to subtract values from each other          // huh?  this is a multiply, not a subtract.  Also, everyone can see it's using SIMD, that part adds no new information
    // if you really want to manually vectorize this, instead of using an OpenMP pragma or -O3 on the scalar loop, then:
    for (size_t i = 0; i < last_full_vector; i += 4) {
        __m256d vmat = _mm256_loadu_pd(matrix + i);
        __m256d vmul = _mm256_mul_pd(vmat, vdamp);
        __m256d vres = _mm256_add_pd(vmul, vhat);
        _mm256_store_pd(&matrix_hat[i], vres);    // aligned store.  Doesn't matter for performance.
    }
    #if 0
    // Scalar cleanup
    for (size_t i = last_vector; i < g_n2; i++) {
        matrix_hat[i] = g_dampener * matrix[i] + HAT;
    }
    #else
    // assume that g_n2 >= 4, and do a potentially-overlapping unaligned vector
    if (last_full_vector != g_n2) {
        // Or have this always run, and have the main loop stop one element sooner (so this overlaps by 0..3 instead of by 1..3 with a conditional)
        assert(g_n2 >= 4);
        __m256d vmat = _mm256_loadu_pd(matrix + g_n2 - 4);
        __m256d vmul = _mm256_mul_pd(vmat, vdamp);
        __m256d vres = _mm256_add_pd(vmul, vhat);
        _mm256_storeu_pd(&matrix_hat[g_n2-4], vres);
    }
    #endif

    return matrix_hat;
}


此版本compiles (after defining a couple globals) to the asm we expect。顺便说一句,普通人将大小作为函数参数传递。这是避免由于C别名规则而导致优化失败的另一种方法。

无论如何,最好的选择是让OpenMP自动向量化它,因为那样您就不必自己编写清理循环了。数据组织没有什么棘手的问题,因此它可以向量化。 (而且这并不是减少问题,就像您在另一个问题中那样,因此没有循环承载的依赖关系或操作顺序问题)。

关于c - 我正在使用SIMD和pthreads做些什么来减慢我的程序速度?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37542238/

10-11 19:42