4 Linux 进程管理

4.8 IPC 消息队列

linux 中进程通信两种高级通信机制 IPC 的消息队列和共享内存。IPC 消息队列,其中涉及的函数和数据结构分别定义在 ipc/msg.c 和 include/linux/msg.h。

4.8.1 消息队列的结构

IPC 消息队列一般用于客户机/服务器(C/S)模型中,客户机进程向服务器发送请求服务的消息,服务器进程接受到消息后执行客户机请求

1.消息

IPC 中一个消息由消息头和消息正文组成。消息头是一个 msg 结构体。其定义如下:

struct msg
{
struct msg *msg_next; /*指向下一个消息*/
long msg_type; /*消息类型:大于 0,是通信双方约定的消息标志*/
char *msg_spot; /*消息正文地址*/
time_t msg_time; /*消息发送时间*/
short msg_ts; /*消息正文大小,最大长度由 MSGMAX 决定,缺省为
4057 字节*/
}
2.消息队列

Linux 中可以根据进程的需要建立多个 IPC 消息队列,消息队列的最大数量由符号常量 MSGMNI 决定,缺省为 128。系统对所有消息队列统一管理。每个消息队列有唯一的标识号。每个消息队列是由消息结构体构成的单向链表。
描述消息队列的数据结构 struct msqid_ds,称为消息队列描述符。与消息队列一一对应。其定义如下:

struct msqid_ds
{
Struct ipc_perm msg_perm; /*访问权限*/
Struct msg *msg_first; /*指向消息队列头*/
Struct msg *msg_last; /*指向消息队列尾*/
Time_t msg_stime; /*最近发送消息的时间*/
Time_t msg_rtime; /*最近接收消息的时间*/
Time_t msg_ctime; /*最近修改消息的时间*/
Struct wait_queue *wait; /*等待接收消息的进程等待队列*/
Struct wait_queue *wait; /*等待发送消息的进程等待队列*/
Unshort msg_cbytes; /*队列中当前字节数*/
Unshort msg_qnum; /*队列中消息数*/
Unshort msg_qbytes; /*队列最大字节数,不能超过 MSGMN(缺省 16384)*/
Unshort msg_lspid; /*最近发送进程的 PID*/
Unshort msg_lrpid; /*最近接收进程的 PID*/
}

<Linux>(极简关键、省时省力)《Linux操作系统原理分析之Linux 进程管理 8》(12)-LMLPHP

4.8.2 消息队列的生成与控制

1.建立及检索消息队列 建立及检索消息队列

在程序中使用 msgget()建立一个消息队列或者检索消息队列的标识号。其定义为:

Int msgget(key_t key, int msgflg) /*msgflg 的值与 semget()中的 semflg 完全相同*/

调用成功则返回消息队列的标识号,否则返回-1;

2.消息队列的控制 消息队列的控制

对消息队列的控制操作由系统调用 msgctl()实现,其定义如下:

Int msgctl(int msqid,int cmd,struct msqid_ds *buf)

说明:cmd 参数的种类与 semctl()类似,但制定操作的意义不同。主要有:

4.8.3 消息的发送与接收

进程使用系统调用 msgsnd()向消息队列写消息;接受进程使用系统调用 msgrcv()从消息队列读取消息。

Int msgsnd(int msqid,struct msgbuf *msgp,int msgsz,int msgopt);
Int msgrcv(int msqid,struct msgbuf *msgp,int msgsz,int msgtyp,int msgopt);

Msgsnd()调用成功返回 0,msgrcv()调用成功返回值为接收到的消息的长度;两个函数调用失败返回错误信息。
👉Msqid:消息队列标识号。
👉Msgp:指向一个 msgbuf 结构体 其定义为:

Struct msgbuf
{
Long mtype; /*消息类型*/
Char mtext[1]/*消息正文*/
}

说明:

👉Msgsz:消息正文长度。
👉Msgopt:操作模式。

👉Msgtyp:从消息队列读取消息的方式。

4.8.4 消息队列的程序例

使用消息队列在进程间通信的程序例,其功能是在进程间完成文件的传送。一个发送进程把两个文件送入消息队列,另外两个接收进程分别从消息队列各自接收一个文件。该例由 3 个源程序文件组成:user.h 是共用的头部文件,filesnd.c 的功能是把两个文件读出并发送到消息队列,filercv.c 的功能是从消息队列读取文件。发送进程执行 filesnd.c,两个接收进程执行 filercv.c。

/*user.h*/
#define F_KEY “copyfile” /*指定生成 IPC 键值的文件名*/
#define F_TYPE1 1 /*第一个文件的消息类型*/
#define F_TYPE2 2 /*第二个文件的消息类型*/
#define F_TYPE_END1 3 /*第一个文件接收完毕的消息类型*/
#define F_TYPE_END2 4 /*第二个文件接收完毕的消息类型*/
#define BUF_SIZE 256
Struct msg_data /*替代上节所讲 msgbuf 结构体,作为发送区(接收区)*/
{
Long type;
Int size;
Char buf[BUF_SIZE];
}



/*filesnd.c 数据发送程序*/
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include < sys/stat.h >
#include “user.h “
Close_all(int msgid, int fd1, int fd2)
{
Close(fd1); /*关闭文件*/
Close(fd2); /*关闭文件*/
Msgctl(msgid,IPC_RMID,0);/*删除消息队列*/
}
Mainint argc,char **argv)
{
Int fd1,fd2,msgid;
Int end1=1,end2=1Key_t key;
Struct msg_data data1,data2;
If(argc!=3/*检查命令行参数*/
{
Printf(”usage:argv[0] File1 File2 <CR>\n”);
Exit-1);
}
If((fd1=open(argv[1]O_RDONLY==-1/*打开第一个文件,只读*/
{
Printf(%s can’t opened\n”,argv[1]);
Exit-1);
}
If((fd2=open(argv[2]O_RDONLY==-1/*打开第二个文件,只读*/
{
Printf(%s can’t opened\n”,argv[2]);
Exit-1);
}
If((key=ftok(F_KEY,’a’))==-1/*生成 IPC 键值*/
{
Close(fd1);
Close(fd2);
Printf(Sender: create Key Error.\n”);
Exit-1);
}
If((msgid=msgget(key,IPC_CREAT|IPC_EXCL|0666))==-1) /*生成消息队列*/
{
Close(fd1);
Close(fd2);
Printf(Sender: create Message queue Error.\n”);
Exit-1);
}
Printf(“\nSender: create Message queue created.\n”);
Printf(Copy %s and %s\n”,argv[1],argv[2]);
Data1.type=F_TYPE1; /*确定两个文件的消息类型*/
Data2.type=F_TYPE2;
Data1.size= Data2.size=-1;
While(data1.size||data2.size) /*读取和发送两个文件*/
{
If(data1.size)
{
Data1.size=read(fd1,data1.buf,BUF_SIZE) /*读取第一个文件*/
If(data1.size==-1{
Close_all(msgid,fd1,fd2);
Printf(Sender:error read file%s \n”,argv[1]);
Exit-1);
}
/*写入消息队列*/
If(msgsnd(msgid,(struct msgbuf*&data1,sizeof(struct msg_data),0==-1{
Printf(Sender:error message send file%s \n”,argv[1]);
Exit-1);
}
}
If(data2.size)
{
Data2.size=read(fd2,data2.buf,BUF_SIZE) /*读取第二个文件*/
If(data2.size==-1{
Close_all(msgid,fd1,fd2);
Printf(Sender:error read file%s \n”,argv[2]);
Exit-1);
}
/*写入消息队列*/
If(msgsnd(msgid,(struct msgbuf*&data2,sizeof(struct msg_data),0==-1{
Printf(Sender:error message send file%s \n”,argv[2]);
Exit-1);
}
}
}
Printf(Sender:data transmission to message queue complete \n”);
Printf(Sender:waiting for receiver to finish reading\n”);
/*等待接收进程结束*/
While(end1||end2)
{
Struct msgbuf rec;
If(msgrcv(msgid,&rec,sizeof(struct msgbuf),TYPE_END1,IPC_NOWAIT)!=-1)
End1=0;
If(msgrcv(msgid,&rec,sizeof(struct msgbuf),TYPE_END2,IPC_NOWAIT)!=-1)
End2=0;
}
Printf(“\nSender:disconnection of the two receiver processes \n”);
Close_all(msgid,fd1,fd2);
}




/*filercv.c 数据接收程序*/
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include < sys/stat.h >
#include “user.h “
Mainint argc,char **argv)
{
Int fd,ret,msgid;
Int no;
Int type;
Key_t key;
Struct msg_data data;
Struct msgbuf dis_msg;
If(argc!=3/*检查命令行参数*/
{
Printf(”usage:filerv 1|2 File_name <CR>\n”);
Exit-1);
}
No=atoi(argv[1])If((no!=1&&(no!=2){
Printf(”usage:%s 1|2 File_name <CR>\n”,argv[0]);
Exit-1);
}
If(no==1)type = F_TYPE1;
Else type = F_TYPE2;
If((key=ftok(F_KEY,’a’))==-1) /*生成 IPC 键值*/
{
Printf(“receiver: create Key Error.\n”);
Exit-1);
}
If((msgid=msgget(key,IPC_EXCL))=-1/*得到键值对应的消息队列标识号*/
{
Printf(“receive:create message queue error\n”);
Exit(-1);
}
Printf(“\nreceive %d connected to the message queue \n”,no,getid();
If((fd2=open(argv[2]O_WRONLY==-1/*打开文件,只写*/
{
Printf(%s can’t opened\n”,argv[2]);
Exit-1);
}
While1{ /*从消息队列接收消息*/
If(msgrcv(msgid,(struct msgbuf*&data, sizeof(struct msg_data),type,0)!=-1)
{
Printf(Receiver %d error during reception.\n”,no);
Exit-1);
}
If(data.size==0) break;
Ret = write(fd,data.buf,data.size) /*把读取的消息写入文件*/
If(ret==-1{
Close(fd);
Msgctl(msgid,IPC_RMID0);
Printf(“receiver %d error during writing\n“,no);
Exit-1);
}
}
Close(fd);
Printf(“receiver %d disconnection\n“,no);
If(no==1Dis_msg.mtype = TYPE_END1;
else
Dis_msg.mtype = TYPE_END2;
Msgsnd(msgid,&dis_msg,sizeof(struct msgbuf),0);/*发送接收完毕消息*/
}
该程序分别由三个进程执行,使用的命令和运行结果如下:
#./filesnd dbfile1 dbfile2 &
Sender: message queue create;
Sender: data transmission to message queue completed
Sender: waiting for receiver to finish reading
#./filercv 1 file1&
Receiver 1 connected to the message queue
Receiver 1 disconnection
#./filercv 1 file2 
Receiver 2 connected to the message queue
Receiver 2 disconnection
Sender: disconnection of the two receiver processes.
11-29 14:08