消息队列

  • 消息队列是内核中的一个链表
  • 用户进程将数据传输到内核后,内核重新添加一些如用户ID、组ID、读写进程的ID和优先级等相关信息后并打包成一个数据包称为消息
  • 允许一个或多个进程往消息队列中读写消息,但一个消息只能被一个进程读取,读取完毕后自动删除
  • 消息队列具有一定的FIFO的特性,消息可以按照顺序发送到队列中,也可以几种不同的方式从队列中读取。每一个消息队列在内核中用一个唯一的IPC标识ID表示
  • 消息队列的实现包括创建和打开队列发送消息读取消息控制消息队列四种操作。

消息队列属性

struct msqid_ds
{
struct ipc_perm msg_perm;
msgqnum_t msg_qnum; //消息数量
msglen_t msg_qbytes; //消息最大字节数
pid_t msg_lspid; //最后一次发送消息进程的pid
pid_t msg_lrpid; //最后一次接收消息的pid
pid_t msg_stime; //最后一次消息发送的时间
pid_t msg_ctime; //最后一次消息改变的时间
};

打开或创建消息队列

#include <sys/msg.h>
int msgget(key_t key,int flag);
//返回:如果成功,返回内核中消息队列的标识ID,出错返回-1
  • 参数

    • key:用户指定的消息队列键值
    • flag:IPC CREAT,IPC EXCL等权限组合
  • 若创建消息队列,key可以指定键值,也可以设置为IPC_PRIVATE(0)。若打开进行查询,则key不能为0,必须是一个非零的值,否则查询不到

消息队列控制

#include <sys/msg.h>
int msgctl(int msgid,int cmd,struct msqid_ds *buf);
//返回:成功返回0,出错返回-1
  • 参数

    • msgid:消息队列ID
    • buf:消息队列属性指针
    • cmd
      • IPC_STAT:获取消息队列的属性,取此队列的msqid_ds结构,并放在buf指向的结构中
      • IPC_SET:设置属性,按由buf只想的结构中的值,设置与此队列相关的结构中的字段
      • IPC_RMID:删除队列,从系统中删除该队列以及在队列上的所有数据。

发送消息

#include <sys/msg.h>
int msgsnd(int msgqid,const void *ptr,size_t nbytes,int flag);
//成功返回0,出错返回-1 ptr:
struct mymesg
{
long mtype;//消息类型
char mtext[512];//消息数据本身
};
  • nbytes 指定消息的大小,不包括mtype的大小
  • mtype指消息的类型,由一个整数来表示,且大于0
  • mtext消息数据本身
  • 在Linux中,消息的最大长度是4056个字节,其中包括mtype,占4个字节
  • 结构体mymesg用户可自定义,但第一个成员必须是mtype
  • 参数flag
    • 0:阻塞
    • IPC_NOWAIT:类似文件I/O的非阻塞
    • 若消息队列满(或者是队列中的消息总数等于系统限制值,或队列中的字节数等于系统限制值),则指定IPC_NOWAIT使得msgsnd立即出错返回EAGAIN。如果指定0,则
      • 阻塞直到有空间可以容纳要发送的消息
      • 或从系统中删除了此队列
      • 或捕捉到一个信号,并从信号处理程序返回

接收消息

#include <sys/msg.h>
ssize_t msgrcv(int msgqid,void *ptr,size_t nbytes,long type,int flag);
//成功返回消息数据部分的长度,出错返回-1
  • 参数

    • magqid:消息队列的ID
    • ptr:指向存放消息的缓存
    • nbytes:消息缓存的大小,不包括mtype的大小。计算方式
      • nbytes=sizeof(struct mymesg)-sizeof(long)
    • type:消息类型
      • type==0:获得消息队列中的第一个消息
      • type>0:获得消息队列中类型type的第一个消息
      • type<0:获得消息队列中小于或等于type绝对值的消息
    • flag:0或者IPC_NOWAIT

消息队列发送消息

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h> typedef struct
{
long type;//消息类型
int start;//消息数据本身(包括start和end)
int end;//
}MSG; //往消息队列中发送消息
int main(int argc,char *argv[])
{
if(argc<2)
{
printf("usage:%skey\n",argv[0]);
exit(1);
}
key_t key=atoi(argv[1]);
printf("key:%d\n",key); //创建消息队列
int msq_id;
if((msq_id=msgget(key,IPC_CREAT|IPC_EXCL|0777))<0)
{
perror("msgget error");
}
printf("msq id:%d\n",msq_id); //定义要发送的消息
MSG m1={4,4,400};
MSG m2={2,2,200};
MSG m3={1,1,100};
MSG m4={6,6,600};
MSG m5={6,60,6000}; //发送消息到消息队列
if(msgsnd(msq_id,&m1,sizeof(MSG)-sizeof(long),IPC_NOWAIT)<0)
{
perror("msgsnd error");
}
if(msgsnd(msq_id,&m2,sizeof(MSG)-sizeof(long),IPC_NOWAIT)<0)
{
perror("msgsnd error");
}
if(msgsnd(msq_id,&m3,sizeof(MSG)-sizeof(long),IPC_NOWAIT)<0)
{
perror("msgsnd error");
}
if(msgsnd(msq_id,&m4,sizeof(MSG)-sizeof(long),IPC_NOWAIT)<0)
{
perror("msgsnd error");
}
if(msgsnd(msq_id,&m5,sizeof(MSG)-sizeof(long),IPC_NOWAIT)<0)
{
perror("msgsnd error");
} //发送后区获取消息队列中消息的总数
struct msqid_ds ds;
if(msgctl(msq_id,IPC_STAT,&ds)<0)
{
perror("msgctl error");
}
printf("msg total:%ld\n",ds.msg_qnum); exit(0);
}

运行程序,指定key

$ ./msq_snd 1024
key:1024
msq id:65536
msg total:5

使用命令查看IPCS

$ ipcs -q

使用命令删除MSQ

$ ipcrm -q 65536

IPC 进程间通信方式——消息队列-LMLPHP

05-28 18:10