在C程序中,某些线程(pthread1pthread2,...)生成一条消息,并且所产生的消息由另一个线程(pthreadprint)处理,该线程将打印它们。

这些消息在处理之前可能会“困惑”:根据pthreadprint打印所需的时间,在一定时间可以堆叠多条消息作为pthreadprint的输入。我显然不知道在最坏的情况下可以等待处理的最大邮件数。

将这些消息(从“ cc”,“ pthread1”,“产生”它们的...)发送到打印它们的pthread2的最佳方法是什么?它们不仅应该被传输,而且应该被“存储”。我知道互斥体和条件变量,但是它们不代表队列:是否可以使用所有线程都可以访问的FIFO队列?

最佳答案

解决这类生产者-消费者问题的一种简单而强大的解决方案是使用受互斥锁保护的消息的单链接列表。使用C99和pthreads:

#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>

struct message {
    struct message *next;
    /* Payload is irrelevant, here just as an example: */
    size_t          size;
    char            data[];
};

typedef struct {
    pthread_mutex_t lock;
    pthread_cond_t  more;
    struct message *newest;
    struct message *oldest;
} queue;
#define QUEUE_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, NULL }

/* Nonblocking variant */
struct message *try_dequeue(queue *const q);

/* Blocking variants */
int enqueue(queue *const q, struct message *const m);
struct message *dequeue(queue *const q);

/* Suggested interface for queueing a new message */
int queue_printf(queue *const q, const char *const format, ...);


实现很简单。


struct message单链接列表首先显示最早的消息,并在末尾附加新消息。
空队列同时具有newest == NULLoldest == NULL
在检查指针之前,所有队列操作(enqueue()dequeue()try_dequeue())都将使用队列lock互斥锁。 (为减少大量使用时的争用,请使锁定持续时间尽可能短;换句话说,在获取锁定之前,首先要完全构造消息。)
阻塞出队调用可以通过等待more条件变量(当队列为空时)来等待新消息。
当第一条消息入队时,newestoldest都指向它。
当将第一个消息放入队列时,如果存在阻塞出队等待新消息的情况,则会发出条件变量more的信号。
使更多消息入队,将newest->next设置为首先指向新消息,然后将newest设置为指向新消息。
出队将oldest成员从列表中分离出来,将oldest更新为指向oldest->next。如果oldest变为NULL(然后newestoldest都指向同一消息,这是队列中唯一的消息),由于队列为空,因此newest也将设置为NULL
仅当锁定lock互斥锁失败时(通常仅在C库检测到死锁情况下才失败),或者如果您检查队列结构处于不一致状态(例如,但不是,则不行),则使消息入队失败例如,newestoldest都为NULL
上面原型中的逻辑是,如果成功,则返回0,否则返回errno错误代码(EINVALEDEADLK)。我还喜欢将errno设置为该错误代码,以便与出队对称。
使消息出队可能会因与入队相同的原因而失败,以及队列为空(EWOULDBLOCK / EAGAIN)时也会失败。在这些情况下,函数可能会返回设置了NULLerrno


如您所见,入队和出队都是O(1)操作,即花费固定的时间;无需在任何时候遍历整个列表。

只需重复上述操作,入队/出队操作就可以一次使多条消息入队/出队。但是,在实践中很少需要这样做。 (对于出队,主要原因是,如果您一次抓取多个消息,并且一条消息出现故障或错误,那么您也必须处理该错误以及尚未处理但出队的消息; bug-更容易一件一件地做事。此外,如果消息顺序不是很严格,那么如果他们逐个出队,您可能总是有多个消费者并行工作。)



高级说明:

如果依赖于C99标准,则可以对以struct message *next;开头的任何结构类型使用相同的代码。根据C99规则,此类结构是兼容的(对于该共享的初始部分),并且这是队列操作访问的唯一部分。

换句话说,如果您有多种消息类型,每种消息类型都存储在自己的队列中,则对于所有不同的消息类型,只需一个enqueue() / dequeue() / try_dequeue()实现,只要所有消息结构都以struct message *next;

为了确保类型安全,您需要一些简单的包装函数:

static inline int enqueue_yourtype(yourtype_queue *const q, struct yourtype_message *const m)
{
    return enqueue((queue *const)q, (struct message *const)m);
}

static inline struct yourtype_message *dequeue_yourtype(yourtype_queue *const q)
{
    return dequeue((queue *const)q);
}

static inline struct yourtype_message *try_dequeue_yourtype(yourtype_queue *const q)
{
    return try_dequeue((queue *const)q);
}


在头文件中定义时,实际上不应产生任何开销-实际上,无论如何都不应生成任何其他代码,除非您出于某种原因使用一个地址(在这种情况下,必须在其中发出一个非内联版本)每个编译单元采用一个地址)。但是,它们确实在编译时提供类型检查,这通常很有用。

07-26 05:23