消息队列(Message Queue)是一种进程间通信(IPC)机制,它允许进程通过在队列中添加和读取消息来交换数据。与管道(命名/匿名)相比,消息队列具有结构化消息、异步通信和消息持久化等特点,更适合复杂的进程间数据交换场景。
核心特性
消息结构化
每个消息都有一个类型标识(通常是整数)和数据内容,接收进程可以根据类型选择性读取消息,而无需按顺序处理所有数据。
异步通信
发送进程发送消息后无需等待接收进程立即处理,可继续执行其他操作;接收进程可在需要时读取消息,两者无需同步运行。
消息持久化
消息存储在内核空间,即使发送进程退出,消息也会保留在队列中,直到被接收进程读取或手动删除。
多进程交互
多个进程可以向同一消息队列发送消息,也可以从队列中读取消息(通过类型筛选实现一对一、一对多通信)。
消息队列的使用(System V 消息队列,Linux 为例)
System V 消息队列是最常用的实现,通过以下系统调用操作:
- msgget():创建或获取消息队列
- msgsnd():发送消息到队列
- msgrcv():从队列接收消息
- msgctl():控制消息队列(如删除、获取状态)
1. 消息结构定义
消息需要按固定格式定义,包含类型和数据:
#include <sys/msg.h>
// 消息结构(必须以 long 类型的 mtype 开头)
struct msgbuf {
long mtype; // 消息类型(>0)
char mtext[1024]; // 消息数据(可自定义大小和类型)
};
2. 创建/获取消息队列(msgget)
// 创建或获取消息队列,返回队列 ID
int msgid = msgget(key_t key, int flags);
- key:用于标识消息队列的键值(可通过 ftok() 生成唯一键)
- flags:创建权限和操作标志(如 IPC_CREAT | 0666 表示创建队列,权限为 666)
3. 发送消息(msgsnd)
// 向队列发送消息,成功返回 0,失败返回 -1
int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg);
- msgid:消息队列 ID
- msgp:指向消息结构的指针
- msgsz:消息数据部分(mtext)的长度
- msgflg:发送标志(0 表示阻塞,IPC_NOWAIT 表示非阻塞)
4. 接收消息(msgrcv)
// 从队列接收消息,成功返回接收的字节数,失败返回 -1
ssize_t msgrcv(int msgid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
- msgtyp:指定接收的消息类型(0 接收任意类型,>0 接收指定类型,<0 接收小于等于其绝对值的类型)
- 其他参数同 msgsnd
5. 控制消息队列(msgctl)
// 控制消息队列(如删除),成功返回 0,失败返回 -1
int msgctl(int msgid, int cmd, struct msqid_ds *buf);
- cmd:操作命令(IPC_RMID 表示删除队列)
完整示例
发送进程(sender.c)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/msg.h>
#include <sys/ipc.h>
struct msgbuf {
long mtype;
char mtext[1024];
};
int main() {
// 生成唯一键(文件路径和项目ID需与接收进程一致)
key_t key = ftok("/tmp", 'A');
if (key == –1) {
perror("ftok failed");
exit(1);
}
// 创建或获取消息队列
int msgid = msgget(key, IPC_CREAT | 0666);
if (msgid == –1) {
perror("msgget failed");
exit(1);
}
// 准备消息(类型为 1,数据为 "Hello, receiver!")
struct msgbuf msg;
msg.mtype = 1;
strcpy(msg.mtext, "Hello, receiver!");
// 发送消息
if (msgsnd(msgid, &msg, strlen(msg.mtext) + 1, 0) == –1) {
perror("msgsnd failed");
exit(1);
}
printf("发送消息: %s\\n", msg.mtext);
return 0;
}
接收进程(receiver.c)
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <sys/ipc.h>
struct msgbuf {
long mtype;
char mtext[1024];
};
int main() {
// 生成与发送进程相同的键
key_t key = ftok("/tmp", 'A');
if (key == –1) {
perror("ftok failed");
exit(1);
}
// 获取消息队列(不创建,只连接已存在的)
int msgid = msgget(key, 0666);
if (msgid == –1) {
perror("msgget failed");
exit(1);
}
// 接收消息(只接收类型为 1 的消息)
struct msgbuf msg;
ssize_t n = msgrcv(msgid, &msg, sizeof(msg.mtext), 1, 0);
if (n == –1) {
perror("msgrcv failed");
exit(1);
}
printf("接收消息: %s\\n", msg.mtext);
// 接收完成后删除消息队列(可选)
if (msgctl(msgid, IPC_RMID, NULL) == –1) {
perror("msgctl failed");
exit(1);
}
return 0;
}
运行方式:
关键注意事项
消息类型的作用
接收进程可通过 msgtyp 筛选消息,例如:
- 按优先级处理(高类型消息优先)
- 实现多进程定向通信(不同进程使用不同类型)
消息大小限制
系统对单条消息的大小有限制(可通过 msgmax 配置),超过限制会导致发送失败。
队列容量限制
消息队列的总字节数也有限制(msgmnb),满队列时发送操作会阻塞(非阻塞模式下返回错误)。
资源释放
消息队列不会自动销毁,需通过 msgctl(…, IPC_RMID, …) 手动删除,否则会残留内核中占用资源。
与其他 IPC 的对比
消息队列 | 结构化消息、异步、按类型读取 | 复杂数据交换、多进程通信 |
管道 | 流式数据、简单、顺序读取 | 简单命令交互、父子进程通信 |
共享内存 | 速度最快、直接访问内存 | 高频数据交换、大数据量传输 |
信号量 | 用于同步和互斥,不传递数据 | 控制进程对共享资源的访问 |
应用场景
- 分布式系统中的进程协作(如服务端与多个客户端的消息交互)。
- 日志收集系统(不同进程按类型发送日志,收集进程分类处理)。
- 任务调度(调度进程发送任务消息,工作进程按类型接收并执行)。
消息队列通过结构化和异步特性,简化了复杂进程间通信的设计,是中大型系统中常用的 IPC 方案。
评论前必须登录!
注册