我必须编写一个 MPI 库,其中每个进程都在做一些独立的任务,但应该对一些可以从其他进程不可预测地发送的消息使用react。
这些消息的发送和接收都是库的一部分,我不能假设库函数将被频繁调用以跟踪立即发送的进度或检查接收队列。如果接收进程正在做一些计算,发送进程可能会被阻塞一段不可预测的时间。

我目前感兴趣的解决方案是让每个 MPI 进程产生一个 pthread 线程,该线程固定在自己的 CPU 上,使用循环中的阻塞接收来接收这些消息。正如我所担心的,我的实验表明这个线程占用了一半的 CPU 时间(我希望阻塞接收能以某种方式与内核一起工作以避免这种情况)。

我通过在一个进程的一个线程中使用一个伪计算函数来衡量这种行为,在另一个线程中使用一个阻塞接收,另一个进程发送一条消息以供第一个进程接收,但只有在计算完成时,这是由计算之后和发送消息之前的屏障。每个进程只有一个线程参与屏障,因此它可以工作。这可以确保接收线程在另一个线程进行计算时确实卡在等待消息中。然后我测量计算时间。设置如下所示:

             +                 +
             | P0              | P1
          +--+--+              |
          |     |              |
compute() |     | Recv(1)      |
          |     |              |
          +--------------------+ Barrier
          |     |              |
          |     |              | Send(0)
          |     |              |
          +     +              +

我试图将阻塞接收更改为一个 MPI_Iprobe 循环,该循环会将 CPU 交给另一个线程,这样如果没有要接收的消息,就不会占用太多 CPU 时间,为此我使用了 sleep(0) 函数作为 pthread_yieldsched_yield需要特权才能将调度策略更改为实时策略,我不确定我是否需要。
然后是 nanosleep 函数来控制间隔。

一个简单的版本如下所示:
int flag;
while (1)
{
    MPI_Iprobe(1, 0, comm, &flag, MPI_STATUS_IGNORE);
    if (flag == 1) break;
    sleep(0);
}

MPI_Recv(NULL, 0, MPI_INT, 1, 0, comm, MPI_STATUS_IGNORE);

这个 似乎 可以解决我的问题。在我的实验中,计算线程花费的时间与没有其他线程的时间几乎相同,相比之下,如果我只是使用阻塞接收 MPI_Recv 或者我没有使用 sleep(0) ,那么这次是两倍。

这是我用来衡量这个的代码:
#define COMPUTE_LOOP_ITER 200000000

void compute()
{
    int p[2];
    for (int i = 0; i < COMPUTE_LOOP_ITER; ++i)
    {
        p[i%2] = i;
    }
}

void * thread_recv_message(void * arg)
{
    MPI_Comm comm = *(MPI_Comm*) arg;

    int flag;
    while (1)
    {
        MPI_Iprobe(1, 0, comm, &flag, MPI_STATUS_IGNORE);

        if (flag == 1) break;
        sleep(0);
    }

    MPI_Recv(NULL, 0, MPI_INT, 1, 0, comm, MPI_STATUS_IGNORE);

    return NULL;
}

// Returns the compute() time on p0, 0 on others
double test(MPI_Comm comm)
{
    int s, p;
    double res = 0;
    MPI_Comm_rank(comm, &s);
    MPI_Comm_size(comm, &p);

    if (p != 2)
    {
        fprintf(stderr, "Requires 2 processes and no more in comm\n");
        fflush(stderr);

        MPI_Abort(comm, 1);
    }

    // Pin each process to its own core
    int cpuid = sched_getcpu();
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpuid, &cpuset);
    pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);

    if (s == 0)
    {
        pthread_t thr;
        pthread_attr_t attr;

        // Make sure the new thread is pinned on the same core
        pthread_attr_init(&attr);
        pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);

        pthread_create(&thr, &attr, thread_recv_message, &comm);

        double t1,t2;
        t1 = MPI_Wtime();

        compute();

        t2 = MPI_Wtime();

        MPI_Barrier(comm);

        res = t2 - t1;
        pthread_join(thr, NULL);
    }
    else // s == 1
    {
        MPI_Barrier(comm);
        MPI_Send(NULL, 0, MPI_INT, 0, 0, comm);
    }

    MPI_Barrier(comm);

    return res;
}

由于我几乎没有使用 MPI 的经验,也没有将它与线程一起使用,因此这个解决方案对我来说似乎很脆弱,我不知道是否可以依赖它。

我在使用 Linux 内核版本 4.4.0 的 Ubuntu 16.04 上使用 mpich 3.2

这个问题主要是就这个问题和我目前的解决方案征求意见或讨论。如果需要,我可以解释更多我的测试方法或提供更多代码。

最佳答案

由于示例中 computethread_recv_message 之间没有数据依赖关系,因此很难判断接收到的数据究竟做了什么。我也不确定在句子片段中使用“不可预测”的具体含义“应该对某些可以从其他进程不可预测地发送的消息使用react”。

如果您确定等级 x 将在某个时刻将数据发送到等级 y,那么 MPI_IrecvMPI_Test 将实现这种通信风格,而不会阻塞线程调用或完成接收请求。您可以将那些 IrecvTest 调用与您的计算交织在一起,或者每 2 次、64 次或 128 次计算循环迭代或任何合适的迭代调用一次。

如果接收等级事先不知道它将从哪个等级接收,或者数据的大小,那么您可能想要使用 MPI_ProbeMPI_Iprobe 并使用返回的 MPI_Status 结构。对 Iprobe 的调用可以与您的计算交错,类似于我在 Irecv 中所描述的。

也可以使用像 MPI_AlltoallMPI_Allgather 这样的集合来提供与执行 Probes 的许多秩类似的功能,例如,通过交换包含字节计数的数组,这些字节数将在后续点对点调用中在成对的秩之间发送和接收。如果您可以保证所有 rank 最终都会到达集体调用,那么这种方法可能会很好地利用仅对 MPI 内部人员可用的实现细节。您还可以使用等效的非阻塞集合( IalltoallIallgather 等)将此步骤与您的计算重叠。

关于multithreading - 线程中阻塞 MPI_Recv 的 CPU 使用率,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49198247/

10-11 22:27
查看更多