1. 消息队列使用流程
1.1 概念
消息队列是进程间通信(IPC)中自带同步机制的通信方式,天然避免竞态条件,mq_receive() 会阻塞直到消息到达,无需额外同步原语。
|
数据模型 |
无结构的字节流 |
有边界的消息(自动分包) |
|
同步机制 |
需额外信号量/锁 |
内置阻塞/唤醒机制 |
|
数据持久性 |
进程退出后数据保留 |
消息在接收前持久存在 |
|
通信模式 |
多对多共享 |
点对点或广播(POSIX支持多读者) |
|
典型场景 |
高频大数据交换 |
命令/事件通知、低频小数据 |
其管理方式也被视为文件。
1.2 mq_writer.c
// mq_writer.c – 消息队列写进程(发送者)
// 编译:gcc mq_writer.c -o mq_writer -lrt
// 运行:./mq_writer
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <unistd.h>
#include <errno.h>
// ============ 配置参数 ============
#define MQ_NAME "/my_message_queue" // 必须以/开头
#define MQ_PERM 0664 // 权限:rw-rw-r–
#define MAX_MSG 10 // 队列最大消息数
#define MAX_MSG_SIZE 256 // 单条消息最大字节数
// ==================================
typedef struct {
int type; // 消息类型(POSIX标准中实际未使用,但可自定义)
char data[256]; // 消息负载
} msg_t;
int main() {
printf("\\n=== [WRITER] 消息队列写进程启动 (PID=%d) ===\\n", getpid());
// ——————————————————–
// 步骤 1: 创建消息队列属性对象
// ——————————————————–
struct mq_attr attr = {0};
attr.mq_flags = 0; // 0 = 阻塞模式(O_NONBLOCK 可设为非阻塞)
attr.mq_maxmsg = MAX_MSG; // 队列最大消息数
attr.mq_msgsize = MAX_MSG_SIZE; // 单条消息最大字节数
attr.mq_curmsgs = 0; // 当前消息数(只读,创建时忽略)
printf("[1] 配置消息队列属性: max=%d, size=%d\\n",
attr.mq_maxmsg, attr.mq_msgsize);
// ——————————————————–
// 步骤 2: 创建/打开 POSIX 消息队列
// ——————————————————–
mqd_t mq = mq_open(MQ_NAME, O_CREAT | O_EXCL | O_WRONLY, MQ_PERM, &attr);
if (mq == (mqd_t)-1) {
if (errno == EEXIST) {
printf(" 消息队列 %s 已存在,尝试打开…\\n", MQ_NAME);
mq = mq_open(MQ_NAME, O_WRONLY);
if (mq == (mqd_t)-1) {
perror("❌ mq_open (fallback) 失败");
exit(EXIT_FAILURE);
}
} else {
perror("❌ mq_open 创建消息队列失败");
exit(EXIT_FAILURE);
}
}
printf("[2] mq_open 创建消息队列成功: %s, mqd=%d\\n", MQ_NAME, mq);
// ——————————————————–
// 步骤 3: 发送多条消息(自动阻塞直到队列有空间)
// ——————————————————–
msg_t msg;
for (int i = 0; i < 3; i++) {
msg.type = i + 1;
snprintf(msg.data, sizeof(msg.data),
"Message #%d from Writer (PID=%d)", i+1, getpid());
if (mq_send(mq, (char*)&msg, sizeof(msg), 0) == -1) { // 优先级=0
perror("❌ mq_send 失败");
break;
}
printf("[3.%d] mq_send 成功: \\"%s\\"\\n", i+1, msg.data);
sleep(1); // 模拟间隔发送
}
// ——————————————————–
// 步骤 4: 发送终止信号(特殊消息)
// ——————————————————–
msg.type = 999;
strcpy(msg.data, "TERMINATE");
if (mq_send(mq, (char*)&msg, sizeof(msg), 1) == -1) { // 高优先级=1
perror("❌ 终止消息发送失败");
} else {
printf("[4] 发送高优先级终止信号 (priority=1)\\n");
}
// ——————————————————–
// 步骤 5: 关闭消息队列描述符
// ——————————————————–
if (mq_close(mq) == -1) {
perror("❌ mq_close 失败");
}
printf("[5] mq_close 关闭消息队列描述符成功\\n");
// ——————————————————–
// 步骤 6: 删除消息队列名称(创建者负责清理)
// ——————————————————–
// 注意:实际销毁需等待所有进程 mq_close 后
if (mq_unlink(MQ_NAME) == -1) {
if (errno != ENOENT) { // 可能已被其他进程 unlink
perror("❌ mq_unlink 失败");
}
} else {
printf("[6] mq_unlink 删除消息队列名称成功: %s\\n", MQ_NAME);
}
printf("=== [WRITER] 写进程正常退出 ===\\n\\n");
return 0;
}
1.3 mq_reader.c
// mq_reader.c – 消息队列读进程(接收者)
// 编译:gcc mq_reader.c -o mq_reader -lrt
// 运行:先运行 ./mq_writer,再运行 ./mq_reader
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <unistd.h>
#include <errno.h>
// ============ 配置参数(必须与 writer 一致) ============
#define MQ_NAME "/my_message_queue"
#define MAX_MSG_SIZE 256
// =====================================================
typedef struct {
int type;
char data[256];
} msg_t;
int main() {
printf(" 读进程等待 1 秒,确保写进程先创建队列…\\n");
sleep(1);
printf("\\n=== [READER] 消息队列读进程启动 (PID=%d) ===\\n", getpid());
// ——————————————————–
// 步骤 1: 打开已存在的消息队列(只读模式)
// ——————————————————–
mqd_t mq = mq_open(MQ_NAME, O_RDONLY);
if (mq == (mqd_t)-1) {
perror("❌ mq_open 打开消息队列失败(确认写进程已运行)");
exit(EXIT_FAILURE);
}
printf("[1] mq_open 打开消息队列成功: %s, mqd=%d\\n", MQ_NAME, mq);
// ——————————————————–
// 步骤 2: 获取队列属性(验证配置)
// ——————————————————–
struct mq_attr attr;
if (mq_getattr(mq, &attr) == -1) {
perror("❌ mq_getattr 失败");
} else {
printf("[2] 队列状态: 当前消息数=%ld, 最大消息数=%ld, 消息大小=%ld\\n",
attr.mq_curmsgs, attr.mq_maxmsg, attr.mq_msgsize);
}
// ——————————————————–
// 步骤 3: 循环接收消息(自动阻塞直到消息到达)
// ——————————————————–
msg_t msg;
unsigned int prio;
ssize_t bytes_read;
printf("[3] 开始接收消息(阻塞模式)…\\n");
while (1) {
// 第三个参数必须 >= mq_msgsize,否则可能截断
bytes_read = mq_receive(mq, (char*)&msg, MAX_MSG_SIZE, &prio);
if (bytes_read == -1) {
if (errno == EINTR) continue; // 被信号中断,重试
perror("❌ mq_receive 失败");
break;
}
printf("[3.%ld] 收到消息 (priority=%u, type=%d): \\"%s\\"\\n",
bytes_read, prio, msg.type, msg.data);
// 检查终止信号
if (msg.type == 999 && strcmp(msg.data, "TERMINATE") == 0) {
printf(" [!] 收到终止信号,退出循环…\\n");
break;
}
sleep(1); // 模拟处理延迟
}
// ——————————————————–
// 步骤 4: 关闭消息队列描述符
// ——————————————————–
if (mq_close(mq) == -1) {
perror("❌ mq_close 失败");
}
printf("[4] mq_close 关闭消息队列描述符成功\\n");
printf("=== [READER] 读进程正常退出 ===\\n\\n");
return 0;
}
1.4 接口
|
mq_open(name, flags, mode, attr) |
创建/打开队列 |
attr 仅创建时有效;O_NONBLOCK 设非阻塞 |
|
mq_send(mqd, msg, len, prio) |
发送消息 |
prio 范围 0~`MQ_PRIO_MAX-1`(通常32767) |
|
mq_receive(mqd, buf, size, prio) |
接收消息 |
prio 返回消息优先级;阻塞直到有消息 |
|
mq_getattr(mqd, attr) |
获取队列状态 |
attr.mq_curmsgs 查看当前消息数 |
|
mq_setattr(mqd, new, old) |
设置/获取属性 |
仅 mq_flags 可修改(切换阻塞模式) |
|
mq_close(mqd) |
关闭描述符 |
进程退出前必须调用 |
|
mq_unlink(name) |
删除队列名称 |
仅创建者调用;实际销毁需所有进程 close |
1.5 流程
┌──────────────────────────────────────────────────────────────────────┐
│ POSIX 消息队列通信流程(阻塞模式) │
└──────────────────────────────────────────────────────────────────────┘
🟦 写进程 (Sender) 🟩 读进程 (Receiver)
────────────────── ──────────────────
│ │
▼ ▼
[1] mq_open(CREAT) [1] mq_open(OPEN)
(创建队列,设 max=10) (打开已有队列)
│ │
│ ▼
│ [2] mq_getattr
│ (查看队列状态)
│ │
▼ │
[2] mq_send(msg1) ────────────────────────► [3] mq_receive()
(发送消息) 【内核自动入队】 (阻塞等待…)
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 消息队列 │ │
│ │ [msg1] │◄─────────────┘
│ └──────────────┘
│ │
▼ ▼
[3] mq_send(msg2) ───► [msg1,msg2] ───► mq_receive() 返回 msg1
│ │
│ ▼
│ 处理 msg1 (sleep 1s)
│ │
▼ ▼
[4] mq_send(TERM) ───► [msg2,TERM] ───► mq_receive() 返回 msg2
│ │
│ ▼
│ 处理 msg2
│ │
│ ▼
│ mq_receive() 返回 TERM
│ │
▼ ▼
[5] mq_close [4] 检测到终止信号
│ │
▼ ▼
[6] mq_unlink (标记删除) [5] mq_close
│ │
└──────────────────┬───────────────────────┘
▼
内核回收资源
(当所有进程 mq_close 后自动销毁)
2. 原理
2.1 生命周期
与有名信号量和共享内存一样,POSIX 消息队列是 "一切皆文件" 的理念:
- 挂载点:/dev/mqueue(tmpfs 文件系统)
- 内核对象:每个队列对应一个 inode,存储在内存中
- 用户态接口:mqd_t 本质是文件描述符(fd)
|
创建 |
mq_open(O_CREAT) |
|
标记删除 |
mq_unlink()(立即移除名称) |
|
实际销毁 |
所有进程 mq_close() 后 |
|
进程崩溃 |
内核自动 mq_close() |
|
残留风险 |
未 mq_unlink 会残留文件 |
创建者进程在 mq_open() 成功后应立即调用 mq_unlink(),确保资源自动回收。
同理,共享内存和信号量也应这样实现,unlink 仅切断新访问路径,不影响已有引用。资源销毁 = unlink 标记 + 引用计数归零
mqd_t mq = mq_open("/myq", O_CREAT | O_RDWR, 0664, &attr);
mq_unlink("/myq"); // 立即标记删除,但队列仍可用
// … 正常通信 …
mq_close(mq); // 最后一个 close 触发内核销毁
2.2 阻塞/非阻塞
// 初始创建为阻塞模式
mqd_t mq = mq_open("/myq", O_RDWR | O_CREAT, 0664, &attr);
// 运行时切换为非阻塞
struct mq_attr new_attr = {0}, old_attr;
new_attr.mq_flags = O_NONBLOCK;
mq_setattr(mq, &new_attr, &old_attr);
// 非阻塞接收(无消息立即返回 EAGAIN)
ssize_t n = mq_receive(mq, buf, size, NULL);
if (n == -1 && errno == EAGAIN) {
printf("无消息可用,继续其他工作…\\n");
}
2.3 优先级
2.3.1 原理
|
优先级范围 |
0 到 MQ_PRIO_MAX – 1 |
默认 MQ_PRIO_MAX = 32768(即 0~32767) |
|
数值 |
数值越大,优先级越高 |
prio=32767 是最高优先级,prio=0 是最低 |
|
接收顺序 |
总是返回当前最高优先级消息 |
与发送顺序无关,完全由优先级决定 |
|
同优先级 |
FIFO 顺序(先发送先接收) |
内核红黑树中同优先级消息按时间戳排序 |
|
发送接口 |
mq_send(mqd, msg, len, prio) |
prio 是独立参数,不占用消息体空间 |
|
接收接口 |
mq_receive(mqd, buf, size, &prio) |
可选获取实际优先级值 |
消息队列红黑树(按 priority 降序排列):
[prio=10] ← 根节点(非最大值)
/ \\
[prio=5] [prio=15] ← 最右节点 = 最高优先级
/ \\ \\
[prio=3] [prio=7] [prio=20] ← mq_receive() 每次取此节点
接收流程:
1. mq_receive() → rb_last() → 找到 prio=20 节点
2. 删除该节点 → 重新平衡红黑树
3. 下次接收 → rb_last() → prio=15 节点
- 插入(mq_send):O(log n)
- 删除(mq_receive):O(log n)
- 查找最大值:O(1)(通过 rb_last 直接定位最右节点)
2.3.2 避免优先级反转
// ❌ 反模式:高优先级消息依赖低优先级资源
mq_send(high_prio_mq, "需要锁A", 32767); // 高优先级消息
// 但处理该消息需等待被低优先级进程持有的锁 → 优先级反转
// ✅ 正确模式:资源隔离 + 专用队列
mqd_t emergency_mq = create_queue("/emergency", 10, 256); // 专用高优先级队列
mqd_t normal_mq = create_queue("/normal", 100, 256); // 普通队列
// 紧急消息走专用通道,不与普通消息竞争
if (is_emergency(cmd))
mq_send(emergency_mq, cmd, len, 32767);
else
mq_send(normal_mq, cmd, len, 10000);
原则:高优先级消息必须有独立资源通道,避免与低优先级消息共享队列导致隐式阻塞。
网硕互联帮助中心





评论前必须登录!
注册