赞
踩
System V IPC(Inter-Process Communication)是一组用于在 Unix-like 操作系统上进行进程间通信的标准,它们最初由 AT&T 在 System V 发行版中引入。System V IPC 提供了三种主要的通信机制:
System V IPC相关的接口如图所示:
System V IPC未遵循“一切都是文件”的Unix哲学,而是采用标识符ID和键值来标识一个System V IPC对象。
System V IPC对象的作用范围是整个操作系统,内核没有维护引用计数。调用各种get函数返回的ID是操作系统范围内的标识符,对于任何进程,无论是否存在亲缘关系,只要有相应的权限,都可以通过操作System V IPC对象来达到通信的目的。
System V IPC对象具有内核持久性。哪怕创建System V IPC对象的进程已经退出,哪怕有一段时间没有任何进程打开该IPC对象,只要不执行删除操作或系统重启,后面启动的进程依然可以使用之前创建的System V IPC对象来通信。
System V IPC对象在文件系统中没有实体文件与之关联。我们不能用文件相关的操作函数来访问它或修改它的属性。所以不得不提供专门的系统调用(如msgctl、semop等)来操作这些对象。在shell中无法用ls查看存在的IPC对象,无法用rm将其删除,也无法用chmod来修改它们的访问权限。幸好Linux提供了ipcs、ipcrm和ipcmk等命令来操作这些对象。
System V IPC对象是靠标识符ID来识别和操作的。该标识符要具有系统唯一性。这和文件描述符不同,文件描述符是进程内有效的。一个进程的文件描述符4和另一个进程的文件描述符4可能毫不相干。但是IPC的标识符ID是操作系统的全局变量,只要知道该值(哪怕是猜测获得的)且有相应的权限,任何进程都可以通过标识符进行进程间通信。
三种IPC对象操作的起点都是调用相应的get函数来获取标识符ID,如消息队列的get函数为:
int msgget(key_t key, int oflg);
其中第一个参数是key_t类型,它其实是一个整型的变量。IPC的get函数将key转换成相应的IPC标识符。根据IPC get函数中的第二个参数oflg的不同,会有不同的控制逻辑:
不同进程可通过同一个key获取标识符ID,进而操作同一个System V IPC对象。那么现在问题就演变成了如何选择key。
选择key有三种方法:
id = msgget(IPC_PRIVATE,S_IRUSR | S_IWUSR);
ftok是file to key的意思,多个进程通过同一个路径名获得相同的key值,进而得到同一个IPC标识符。
ftok函数接口的定义如下:
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
这个函数在Linux上的实现是:按照给定的路径名,获取到文件的stat信息,从stat信息中取出st_dev和st_ino,然后结合给出的proj_id,按照图所示的算法获取到32位的key值。
管道和FIFO都是字节流的模型,这种模型不存在记录边界。如果从管道里面读出100个字节,你无法确认这100个字节是单次写入的100字节,还是分10次每次10字节写入的,你也无法知晓这100个字节是几个消息。管道或FIFO里的数据如何解读,完全取决于写入进程和读取进程之间的约定。
从这个角度上讲,System V消息队列和POSIX消息队列都是优于管道和FIFO的。原因是消息队列机制中,双方是通过消息来通信的,无需花费精力从字节流中解析出完整的消息。System V消息队列比管道或FIFO优越的第二个地方在于每条消息都有type字段,消息的读取进程可以通过type字段来选择自己感兴趣的消息,也可以根据type字段来实现按消息的优先级进行读取,而不一定要按照消息生成的顺序来依次读取。
内核为每一个System V消息队列分配了一个msg_queue类型的结构体,其成员变量和各自的含义如下所示:
struct msg_queue {
struct kern_ipc_perm q_perm;
time_t q_stime; /* 上一次 msgsnd的时间*/
time_t q_rtime; /* 上一次 msgrcv的时间 */
time_t q_ctime; /* 属性变化时间 */
unsigned long q_cbytes; /* 队列当前字节总数*/
unsigned long q_qnum; /*队列当前消息总数*/
unsigned long q_qbytes; /*一个消息队列允许的最大字节数*/
pid_t q_lspid; /*上一个调用msgsnd的进程ID*/
pid_t q_lrpid; /*上一个调用msgrcv的进程ID*/
struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
};
消息队列的创建或打开是由msgget函数来完成的,成功后,获得消息队列的标识符ID,函数接口定义如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
当调用成功时,返回消息队列的标识符,后续的msgsnd、msgrcv和msgctl函数都通过该标识符来操作消息队列。当函数调用失败时,返回-1,并且设置相应的errno。常见的errno如表所示:
获取到消息队列的标识符之后,可以通过调用msgsnd函数向队列中插入消息。内核会负责将消息维护在消息队列中,等待另外的进程来取走消息,从而完成通信的全过程。
msgsnd函数的定义如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
第一个参数msqid是由msgget返回的标识符ID。
第二个参数参数msgp指向用户定义的缓冲区。它的第一个成员必须是一个指定消息类型的long型,后面跟着消息文本的内容。通常其定义如下:
struct msgbuf {
long mtype; /*消息类型,必须大于0*/
char mtext[1]; /*消息体,不一定是字符数组,可以是任意结构*/
};
事实上可以是任意结构,mtext是由程序员定义的结构,其长度和内容都是由程序员控制的,只要发送方和接收方约定好即可。比如可以将结构体定义如下:
struct private_buf {
long mtype;
struct pirate_info {
/*定义你需要的成员变量*/
} info;
};
注意两点,即要对msgsnd进行错误检测和及时释放mbuf,以防止内存泄漏。
第三个参数msgsz指定了mtext字段中包含的字节数。消息队列单条消息的大小是有上限的,,上限值为MSGMAX,记录在/proc/sys/kernel/msgmax中:
cat /proc/sys/kernel/msgmax
8192
sysctl kernel.msgmax
kernel.msgmax = 8192
如果消息的长度超过了MSGMAX,那么msgsnd函数返回-1,并置errno为EINVAL。
最后一个参数msgflg是一组标志位的位掩码,用于控制msgsnd的行为。目前只定义了IPC_NOWAIT一个标志位。
IPC_NOWAIT表示执行一个无阻塞的发送操作。当没有设置IPC_NOWAIT标志位时,如果消息队列满了,那么msgsnd函数就会陷入阻塞,直到队列有足够的空间来存放这条消息为止。但是如果设置了IPC_NOWAIT标志位,那么msgsnd函数就不会陷入阻塞了,而是立刻返回失败,并置errno为EAGAIN。
返回值和常见错误:
msgsnd函数不同于文件的write函数,write函数操作的是字节流,存在部分成功的概念,所以成功时,返回的是写入的字节个数;但是msgsnd函数操作的是封装好的消息,不成功则成仁,不存在部分成功的情况。所以其成功时,msgsnd函数返回0,失败时,msgsnd函数返回-1,并且设置errno。
有发送就要有接收,没有接收者的消息是没有意义的。System V消息队列用msgrcv函数来接收消息。
size_t msgrcv(int msqid, void *msgp, size_t msgsz,
long msgtyp,int msgflg);
其中前三个参数与msgsnd的含义是一致的。msgrcv调用进程也需要定义结构体,而结构体的定义要和发送端的定义一致,并且第一个字段必须是long类型。
第4个参数msgtyp是消息队列的精华,提取消息时,可以选择进程感兴趣的消息类型。正是基于这个参数,读取消息的顺序才无须和发送顺序一致,进而可以演化出很多用法。
第5个参数是可选标志位。msgrcv函数有3个可选标志位。
返回值:
msgrcv函数调用成功时,返回消息体的大小;失败时返回-1,并且设置errno。
System V消息队列存在一个问题,即当消息队列中有消息到来时,无法通知到某进程。消息队列的读取者进程,要么以阻塞的方式调用msgrcv函数,阻塞在消息队列上直到消息出现;要么以非阻塞(IPC_NOWAIT)的方式调用msgrcv函数,失败返回,过段时间再重试,除此以外并无好办法。阻塞或轮询,这就意味着一个进程或线程不得不无所事事,盯在该消息队列上,这给编程带来了不便。
msgctl函数可以控制消息队列的属性,其接口定义如下:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
该函数提供的功能取决cmd字段,msgctl支持的操作如图所示:
为了获取消息队列的属性信息或设置属性,必须要有一个用户态的数据结构来描述消息队列的属性信息,这个数据结构就是msqid_ds结构体(系统自带,包含头文件即可),其大部分字段和内核的msg_queue结构体相对应。注意,msqid_ds结构体中包含下面的成员变量。在编程中,只要包含了对应的头文件,就可以直接使用该结构体。
#include <sys/msg.h>
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /*最后一次调用msgsnd的时间*/
time_t msg_rtime; /*最后一次调用msgrcv的时间 */
time_t msg_ctime; /*属性发生变化的时间*/
unsigned long __msg_cbytes; /*消息队列当前的字节总数*/
msgqnum_t msg_qnum; /*消息队列当前消息的个数*/
msglen_t msg_qbytes; /*消息队列允许的最大字节数*/
pid_t msg_lspid; /*最后一次调用msgsnd的进程ID */
pid_t msg_lrpid; /*最后一次调用msgrcv的进程ID*/
};
// 示例
strutct msqid_ds buf ; /*注意包含头文件*/
msgctl(mid,IPC_STAT,&buf); /*省略error handle*/
// 查看消息队列当前消息的个数
printf(“current # of messages in queue is %d\n”,buf.msg_qnum);
消息队列开放出了4个可以设置的属性。
设置方法一般首先调用IPC_STAT获取到当前的设置,然后修改4个属性中的某个或某几个属性,最后调用IPC_SET,代码如下所示:
strutct msqid_ds buf ; /*注意包含头文件*/
msgctl(mid,IPC_STAT,&buf); /*省略error handle*/
buf.msg_qbytes = NEW_VALUE;
msgctl(mid,IPC_SET,&buf);
IPC_RMID命令用于删除与标识符对应的消息队列。由于IPC对象并无引用计数的机制,因此只要有权限,可以说删就删,而且是立刻就删。消息队列中的所有消息都会被清除,相关的数据结构被释放,所有阻塞的msgsnd函数和msgrcv函数会被唤醒,并返回EIDRM错误。
ipcs -q // 查看系统当前的消息队列
ipcrm -q <消息队列id> // 删除消息队列id
第二个参数是sembuf类型的指针。sembuf结构体定义在sys/sem.h头文件中。一般来说,该结构体至少包含以下三个成员变量:#include <iostream> #include <sys/msg.h> #include <sys/ipc.h> #include <cstring> #include <sys/wait.h> #include <unistd.h> // 定义消息结构体 struct Message { long messageType; char messageText[100]; }; int main() { // 创建消息队列 key_t key = ftok("/tmp", 'a'); // 生成唯一键值 int msgid = msgget(key, IPC_EXCL | IPC_CREAT); int pid = fork(); if (pid < 0) { std::cout << "创建队列失败" << std::endl; return -1; } else if (pid == 0) { for (int i = 0;i < 10;i++) { // 发送消息 Message msgToSend; msgToSend.messageType = 1; // 定义消息类型 strcpy(msgToSend.messageText, "Hello, message queue!"); msgsnd(msgid, &msgToSend, sizeof(msgToSend), 0); sleep(1); } // 发送消息 Message msgToSend; msgToSend.messageType = 1; // 定义消息类型 strcpy(msgToSend.messageText, "Bye!"); msgsnd(msgid, &msgToSend, sizeof(msgToSend), 0); } else { while (1) { // 接收消息 Message msgToReceive; msgrcv(msgid, &msgToReceive, sizeof(msgToReceive), 1, 0); std::cout << "Received message: " << msgToReceive.messageText << std::endl; if (strcmp(msgToReceive.messageText, "Bye!") == 0) break; } // 删除消息队列 msgctl(msgid, IPC_RMID, NULL); int status; pid_t pc = waitpid(0, &status, WNOHANG); if (pc == 0) std::cout << "此时没有子进程退出" << std::endl; else if (WIFEXITED(status)) std::cout << "子进程: " << pc << "正常退出, 退出状态为" << WEXITSTATUS(status) << std::endl; else std::cout << "子进程: " << pc << "非正常退出" << std::endl; } return 0; } [root@Zhn 消息队列]# g++ test.cpp -o test [root@Zhn 消息队列]# ./test Received message: Hello, message queue! Received message: Hello, message queue! Received message: Hello, message queue! Received message: Hello, message queue! Received message: Hello, message queue! Received message: Hello, message queue! Received message: Hello, message queue! Received message: Hello, message queue! Received message: Hello, message queue! Received message: Hello, message queue! Received message: Bye! 此时没有子进程退出 [root@Zhn 消息队列]#
子进程输入十次后输入Bye!,父进程收到Bye!后执行退出,此时子进程还没有退出。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。