文章目录 消息队列 System V 消息队列 POSIX 消息队列 System V vs POSIX 对比 消息队列 内核维护的进程间通信(IPC)机制 以消息为单位进行数据交换(非字节流) 支持不同类型消息的分类处理 消息队列独立于创建进程存在
消息队列VS管道 特性 管道 消息队列 数据类型 字节流 结构化消息 通信方式 先进先出 支持消息类型过滤 持久性 进程结束即消失 显式删除才消失 容量 固定缓冲区 可配置消息数量
System V 消息队列 系统管理命令 # 查看所有IPC对象 ipcs# 创建消息队列(键值12) ipcmk -Q12 # 删除消息队列(ID为1) ipcrm -q1 核心函数 创建/获取消息队列 # include <sys/msg.h> int msgget ( key_t key, int msgflg) ; key:消息队列键值(IPC_PRIVATE 或 ftok() 生成)可以使用ftok()函数生成,也可以指定为IPC_PRIVATE创建一个新的消息队列 msgflg:标志位(IPC_CREAT | 0666),IPC_CREAT创建消息队列,IPC_EXCL与IPC_CREAT一起使用确保创建新的消息队列返回:成功返回消息队列标识符 (msqid),失败返回 -1 发送消息 int msgsnd ( int msqid, const void * msgp, size_t msgsz, int msgflg) ; msqid:消息队列ID msgp:消息结构体指针(必须包含long mtype) msgsz:消息数据长度(不含mtype) msgflg:标志位(0 或 IPC_NOWAIT) 接收消息 ssize_t msgrcv ( int msqid, void * msgp, size_t msgsz, long msgtyp, int msgflg) ; msqid: 消息队列标识符 msgp: 指向消息结构的指针 msgsz: 消息缓冲区的最大长度 msgtyp:消息类型选择:0:接收第一个消息>0:接收指定类型的第一个消息<0:接收类型≤|msgtyp|的最小类型的消息msgflg: 标志位,例如IPC_NOWAIT非阻塞接收返回值: 成功返回接收到的消息数据的长度,失败返回 -1 控制操作 int msgctl ( int msqid, int cmd, struct msqid_ds * buf) ; msqid: 消息队列标识符cmd:IPC_RMID:删除队列 IPC_STAT:获取状态 IPC_SET:设置属性 buf: 指向msqid_ds结构的指针返回值: 成功返回 0,失败返回 -1 消息队列通信 # include <sys/msg.h> # include <string.h> # include <stdio.h> struct msgbuf { long mtype; // 消息类型(必须 > 0) char mtext[ 100 ] ; // 消息数据 } ; int main ( ) { key_t key= ftok ( "." , 'a' ) ; // 生成键值 int msqid= msgget ( key, IPC_CREAT| 0666 ) ; struct msgbuf msg; msg. mtype= 1 ; // 设置消息类型 strcpy ( msg. mtext, "Hello from sender" ) ; //msgsnd(msqid, &msg, strlen(msg.mtext) + 1, 0); if ( msgsnd ( msqid, & msg, sizeof ( msg. mtext) , 0 ) == - 1 ) { perror ( "msgsnd" ) ; exit ( 1 ) ; } printf ( "Message sent: %s\n" , msg. mtext) ; return 0 ; } # include <sys/msg.h> # include <stdio.h> struct msgbuf { long mtype; char mtext[ 100 ] ; } ; int main ( ) { key_t key= ftok ( "." , 'a' ) ; int msqid= msgget ( key, 0666 ) ; struct msgbuf msg; // msgrcv(msqid, &msg, sizeof(msg.mtext), 1, 0); if ( msgrcv ( msqid, & msg, sizeof ( msg. mtext) , 1 , 0 ) == - 1 ) { perror ( "msgrcv" ) ; exit ( 1 ) ; } printf ( "Received: %s\n" , msg. mtext) ; msgctl ( msqid, IPC_RMID, NULL ) ; // 删除消息队列 return 0 ; } POSIX 消息队列 特点 更现代的API设计 支持优先级消息 与文件系统集成 提供更多控制选项 核心函数 创建/打开队列 # include <mqueue.h> // 仅打开已存在的消息队列 mqd_t mq_open ( const char * name, int oflag) ; // 打开或创建新的消息队列,带属性配置 mqd_t mq_open ( const char * name, int oflag, mode_t mode, struct mq_attr * attr) ; name:队列名称(以/开头,如/myqueue, 且后续不能再包含其他<font style="color:rgb(31, 35, 41);background-color:rgba(0, 0, 0, 0);">/</font>),系统级可见的,不同进程通过相同 name 访问同一个消息队列oflag:O_CREAT | O_RDONLY | O_WRONLY | O_RDWRmode:仅当 oflag 包含 O_CREAT 时有效,用于指定新消息队列的访问权限,取值与 open() 函数的权限一致(例如 0644:所有者读写、同组读、其他读),最终权限会被进程的 umask 掩码修正(实际权限 = mode & ~umask)attr:仅当 oflag 包含 O_CREAT 时有效,用于指定新消息队列的属性(若为 NULL,则使用系统默认属性),struct mq_attr 核心成员包括:mq_maxmsg:队列最大可容纳的消息数量 mq_msgsize:队列中单个消息的最大字节长度 mq_curmsgs:队列当前的消息数量(仅用于查询,创建时设置无效) 返回值:成功:返回一个有效的 消息队列描述符(mqd_t 类型),后续所有消息队列操(mq_send/mq_receive 等)均依赖该描述符 失败:返回 (mqd_t)-1,同时设置全局变量 errno 指示错误原因(例如 EEXIST:O_CREAT|O_EXCL 时队列已存在;ENOENT:队列不存在且未指定 O_CREAT) 发送消息 # include <mqueue.h> int mq_send ( mqd_t mqdes, const char * msg_ptr, size_t msg_len, unsigned int msg_prio) ; mqdes:由 mq_open() 成功返回的消息队列描述符,标识要操作的目标消息队列msg_ptr:指向要发送的消息数据缓冲区的指针,消息数据可以是任意二进制数据(无格式要求)msg_len: 指定msg_ptr指向的消息的长度,此长度必须小于或等于队列的mq_msgsize属性,允许使用零长度的消息,即0 < msg_len ≤ 消息队列的 mq_msgsize 属性msg_prio:优先级(0最低,数值越大优先级越高,越先被接收),若无需优先级,可设为 0(默认优先级)返回值:成功:返回 0,表示消息已成功写入消息队列 失败:返回 -1,同时设置 errno 指示错误原因(例如 EAGAIN:非阻塞模式下队列已满;EBADF:无效的消息队列描述符或无写权限) 接收消息 mq_receive()从消息队列描述符 mqdes 引用的消息队列中删除优先级最高的最早消息,并将其放置在msg_ptr指向的缓冲区中如果队列为空,则默认情况下,mq_receive()会阻塞,直到消息可用,或者调用被信号处理程序中断 接收最高优先级的最早消息 ssize_t mq_receive ( mqd_t mqdes, char * msg_ptr, size_t msg_len, unsigned int * msg_prio) ; mqdes:由 mq_open() 成功返回的消息队列描述符,标识要操作的目标消息队列msg_ptr:指向用于接收消息数据的缓冲区的指针,需提前分配足够大小的内存空间msg_len: 指定 msg_ptr 指向的缓冲区的大小,必须大于或等于消息队列的 mq_msgsize 属性msg_prio:指向用于存储接收消息优先级的变量指针,接收成功后,消息的优先级会被写入该变量,若不关心消息优先级,可传入 NULL返回值:成功:返回实际接收到的消息数据的字节长度(ssize_t 类型,非负整数) 失败:返回 -1,同时设置 errno 指示错误原因(例如 EAGAIN:非阻塞模式下队列为空;EMSGSIZE:接收缓冲区大小 msg_len < mq_msgsize) 关闭与删除 int mq_close ( mqd_t mqdes) ; // 关闭队列描述符 int mq_unlink ( const char * name) ; // 删除队列文件 mqdes: 由 mq_open() 成功返回的消息队列描述符,标识要关闭的消息队列 name:与 mq_open() 中一致的消息队列唯一标识名(必须以 / 开头,格式合法),标识要删除的目标消息队列 返回值:成功:返回 0,表示消息队列描述符已成功关闭/消息队列的链接已被移除 失败:返回 -1,同时设置 errno 指示错误原因 文件系统集成 在 Linux 上,消息队列是在虚拟文件系统中创建的 挂载文件系统后,可以使用通常用于文件的命令(例如,ls 和 rm )来查看和作系统上的消息队列 # 挂载消息队列文件系统 mkdir /dev/mqueuemount -t mqueue none /dev/mqueue# 查看队列信息 ls /dev/mqueue/cat /dev/mqueue/myqueue# 目录中每个文件的内容都由一行组成,其中包含有关队列的信息: # 输出:QSIZE:129 NOTIFY:2 SIGNO:0 NOTIFY_PID:8260 QSIZE : 队列中所有消息的数据字节数NOTIFY_PID : 如果该值为非零,则具有此 PID 的进程已使用**mq_notify()**来注册异步消息通知,其余字段描述通知的发生方式NOTIFY : 通知方式:0为SIGEV_SIGNAL1表示SIGEV_NONE2为SIGEV_THREADSIGNO : 用于SIGEV_SIGNAL的SIGNO信号查看消息配置 # 默认消息大小 cat/ proc/ sys/ fs/ mqueue/ msgsize_default # 默认8192 字节 # 最大消息大小 cat/ proc/ sys/ fs/ mqueue/ msgsize_max # 限制值异常处理 // 检查消息队列是否已满 if ( msgsnd ( msqid, & msg, size, IPC_NOWAIT) == - 1 ) { if ( errno== EAGAIN) { // 队列已满处理逻辑 } } // POSIX 非阻塞接收 if ( mq_receive ( mqdes, buf, size, & prio) == - 1 ) { if ( errno== EAGAIN) { // 队列为空 } } 消息队列通信 # include <mqueue.h> # include <string.h> # include <fcntl.h> # define QUEUE_NAME "/my_queue" # define MSG_PRIORITY 1 int main ( ) { mqd_t mqdes= mq_open ( QUEUE_NAME, O_WRONLY| O_CREAT, 0666 , NULL ) ; char message[ ] = "Hello, POSIX!" ; mq_send ( mqdes, message, strlen ( message) , MSG_PRIORITY) ; mq_close ( mqdes) ; return 0 ; } # include <mqueue.h> # include <stdio.h> # define QUEUE_NAME "/my_queue" # define MAX_MSG_SIZE 8192 int main ( ) { mqd_t mqdes= mq_open ( QUEUE_NAME, O_RDONLY) ; char buffer[ MAX_MSG_SIZE] ; unsigned int priority; ssize_t len= mq_receive ( mqdes, buffer, MAX_MSG_SIZE, & priority) ; buffer[ len] = '\0' ; printf ( "[优先级:%u] %s\n" , priority, buffer) ; mq_close ( mqdes) ; // mq_unlink(QUEUE_NAME); // 删除队列 return 0 ; } System V vs POSIX 对比 特性 System V POSIX API风格 较旧 较新,类似文件操作 命名方式 数字键值 路径名 消息优先级 无 有 阻塞控制 有限 更灵活 文件系统集成 无 有(/dev/mqueue) 可移植性 广泛 较新系统 默认限制 系统配置 可调节