云计算百科
云计算领域专业知识百科平台

通信:(9) 消息队列与优先级反转

1. 消息队列使用流程

1.1 概念

消息队列是进程间通信(IPC)中自带同步机制的通信方式,天然避免竞态条件,mq_receive() 会阻塞直到消息到达,无需额外同步原语。

特性

共享内存

消息队列

数据模型

无结构的字节流

有边界的消息(自动分包)

同步机制

需额外信号量/锁

内置阻塞/唤醒机制

数据持久性

进程退出后数据保留

消息在接收前持久存在

通信模式

多对多共享

点对点或广播(POSIX支持多读者)

典型场景

高频大数据交换

命令/事件通知、低频小数据

其管理方式也被视为文件。

1.2 mq_writer.c

// mq_writer.c – 消息队列写进程(发送者)
// 编译:gcc mq_writer.c -o mq_writer -lrt
// 运行:./mq_writer

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <unistd.h>
#include <errno.h>

// ============ 配置参数 ============
#define MQ_NAME "/my_message_queue" // 必须以/开头
#define MQ_PERM 0664 // 权限:rw-rw-r–
#define MAX_MSG 10 // 队列最大消息数
#define MAX_MSG_SIZE 256 // 单条消息最大字节数
// ==================================

typedef struct {
int type; // 消息类型(POSIX标准中实际未使用,但可自定义)
char data[256]; // 消息负载
} msg_t;

int main() {
printf("\\n=== [WRITER] 消息队列写进程启动 (PID=%d) ===\\n", getpid());

// ——————————————————–
// 步骤 1: 创建消息队列属性对象
// ——————————————————–
struct mq_attr attr = {0};
attr.mq_flags = 0; // 0 = 阻塞模式(O_NONBLOCK 可设为非阻塞)
attr.mq_maxmsg = MAX_MSG; // 队列最大消息数
attr.mq_msgsize = MAX_MSG_SIZE; // 单条消息最大字节数
attr.mq_curmsgs = 0; // 当前消息数(只读,创建时忽略)

printf("[1] 配置消息队列属性: max=%d, size=%d\\n",
attr.mq_maxmsg, attr.mq_msgsize);

// ——————————————————–
// 步骤 2: 创建/打开 POSIX 消息队列
// ——————————————————–
mqd_t mq = mq_open(MQ_NAME, O_CREAT | O_EXCL | O_WRONLY, MQ_PERM, &attr);
if (mq == (mqd_t)-1) {
if (errno == EEXIST) {
printf(" 消息队列 %s 已存在,尝试打开…\\n", MQ_NAME);
mq = mq_open(MQ_NAME, O_WRONLY);
if (mq == (mqd_t)-1) {
perror("❌ mq_open (fallback) 失败");
exit(EXIT_FAILURE);
}
} else {
perror("❌ mq_open 创建消息队列失败");
exit(EXIT_FAILURE);
}
}
printf("[2] mq_open 创建消息队列成功: %s, mqd=%d\\n", MQ_NAME, mq);

// ——————————————————–
// 步骤 3: 发送多条消息(自动阻塞直到队列有空间)
// ——————————————————–
msg_t msg;
for (int i = 0; i < 3; i++) {
msg.type = i + 1;
snprintf(msg.data, sizeof(msg.data),
"Message #%d from Writer (PID=%d)", i+1, getpid());

if (mq_send(mq, (char*)&msg, sizeof(msg), 0) == -1) { // 优先级=0
perror("❌ mq_send 失败");
break;
}
printf("[3.%d] mq_send 成功: \\"%s\\"\\n", i+1, msg.data);
sleep(1); // 模拟间隔发送
}

// ——————————————————–
// 步骤 4: 发送终止信号(特殊消息)
// ——————————————————–
msg.type = 999;
strcpy(msg.data, "TERMINATE");
if (mq_send(mq, (char*)&msg, sizeof(msg), 1) == -1) { // 高优先级=1
perror("❌ 终止消息发送失败");
} else {
printf("[4] 发送高优先级终止信号 (priority=1)\\n");
}

// ——————————————————–
// 步骤 5: 关闭消息队列描述符
// ——————————————————–
if (mq_close(mq) == -1) {
perror("❌ mq_close 失败");
}
printf("[5] mq_close 关闭消息队列描述符成功\\n");

// ——————————————————–
// 步骤 6: 删除消息队列名称(创建者负责清理)
// ——————————————————–
// 注意:实际销毁需等待所有进程 mq_close 后
if (mq_unlink(MQ_NAME) == -1) {
if (errno != ENOENT) { // 可能已被其他进程 unlink
perror("❌ mq_unlink 失败");
}
} else {
printf("[6] mq_unlink 删除消息队列名称成功: %s\\n", MQ_NAME);
}

printf("=== [WRITER] 写进程正常退出 ===\\n\\n");
return 0;
}

1.3 mq_reader.c

// mq_reader.c – 消息队列读进程(接收者)
// 编译:gcc mq_reader.c -o mq_reader -lrt
// 运行:先运行 ./mq_writer,再运行 ./mq_reader

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <unistd.h>
#include <errno.h>

// ============ 配置参数(必须与 writer 一致) ============
#define MQ_NAME "/my_message_queue"
#define MAX_MSG_SIZE 256
// =====================================================

typedef struct {
int type;
char data[256];
} msg_t;

int main() {
printf(" 读进程等待 1 秒,确保写进程先创建队列…\\n");
sleep(1);

printf("\\n=== [READER] 消息队列读进程启动 (PID=%d) ===\\n", getpid());

// ——————————————————–
// 步骤 1: 打开已存在的消息队列(只读模式)
// ——————————————————–
mqd_t mq = mq_open(MQ_NAME, O_RDONLY);
if (mq == (mqd_t)-1) {
perror("❌ mq_open 打开消息队列失败(确认写进程已运行)");
exit(EXIT_FAILURE);
}
printf("[1] mq_open 打开消息队列成功: %s, mqd=%d\\n", MQ_NAME, mq);

// ——————————————————–
// 步骤 2: 获取队列属性(验证配置)
// ——————————————————–
struct mq_attr attr;
if (mq_getattr(mq, &attr) == -1) {
perror("❌ mq_getattr 失败");
} else {
printf("[2] 队列状态: 当前消息数=%ld, 最大消息数=%ld, 消息大小=%ld\\n",
attr.mq_curmsgs, attr.mq_maxmsg, attr.mq_msgsize);
}

// ——————————————————–
// 步骤 3: 循环接收消息(自动阻塞直到消息到达)
// ——————————————————–
msg_t msg;
unsigned int prio;
ssize_t bytes_read;

printf("[3] 开始接收消息(阻塞模式)…\\n");
while (1) {
// 第三个参数必须 >= mq_msgsize,否则可能截断
bytes_read = mq_receive(mq, (char*)&msg, MAX_MSG_SIZE, &prio);

if (bytes_read == -1) {
if (errno == EINTR) continue; // 被信号中断,重试
perror("❌ mq_receive 失败");
break;
}

printf("[3.%ld] 收到消息 (priority=%u, type=%d): \\"%s\\"\\n",
bytes_read, prio, msg.type, msg.data);

// 检查终止信号
if (msg.type == 999 && strcmp(msg.data, "TERMINATE") == 0) {
printf(" [!] 收到终止信号,退出循环…\\n");
break;
}

sleep(1); // 模拟处理延迟
}

// ——————————————————–
// 步骤 4: 关闭消息队列描述符
// ——————————————————–
if (mq_close(mq) == -1) {
perror("❌ mq_close 失败");
}
printf("[4] mq_close 关闭消息队列描述符成功\\n");

printf("=== [READER] 读进程正常退出 ===\\n\\n");
return 0;
}

1.4 接口

函数

作用

关键参数说明

mq_open(name, flags, mode, attr)

创建/打开队列

attr 仅创建时有效;O_NONBLOCK 设非阻塞

mq_send(mqd, msg, len, prio)

发送消息

prio 范围 0~`MQ_PRIO_MAX-1`(通常32767)

mq_receive(mqd, buf, size, prio)

接收消息

prio 返回消息优先级;阻塞直到有消息

mq_getattr(mqd, attr)

获取队列状态

attr.mq_curmsgs 查看当前消息数

mq_setattr(mqd, new, old)

设置/获取属性

仅 mq_flags 可修改(切换阻塞模式)

mq_close(mqd)

关闭描述符

进程退出前必须调用

mq_unlink(name)

删除队列名称

仅创建者调用;实际销毁需所有进程 close

1.5 流程

┌──────────────────────────────────────────────────────────────────────┐
│ POSIX 消息队列通信流程(阻塞模式) │
└──────────────────────────────────────────────────────────────────────┘

🟦 写进程 (Sender) 🟩 读进程 (Receiver)
────────────────── ──────────────────
│ │
▼ ▼
[1] mq_open(CREAT) [1] mq_open(OPEN)
(创建队列,设 max=10) (打开已有队列)
│ │
│ ▼
│ [2] mq_getattr
│ (查看队列状态)
│ │
▼ │
[2] mq_send(msg1) ────────────────────────► [3] mq_receive()
(发送消息) 【内核自动入队】 (阻塞等待…)
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 消息队列 │ │
│ │ [msg1] │◄─────────────┘
│ └──────────────┘
│ │
▼ ▼
[3] mq_send(msg2) ───► [msg1,msg2] ───► mq_receive() 返回 msg1
│ │
│ ▼
│ 处理 msg1 (sleep 1s)
│ │
▼ ▼
[4] mq_send(TERM) ───► [msg2,TERM] ───► mq_receive() 返回 msg2
│ │
│ ▼
│ 处理 msg2
│ │
│ ▼
│ mq_receive() 返回 TERM
│ │
▼ ▼
[5] mq_close [4] 检测到终止信号
│ │
▼ ▼
[6] mq_unlink (标记删除) [5] mq_close
│ │
└──────────────────┬───────────────────────┘

内核回收资源
(当所有进程 mq_close 后自动销毁)

2. 原理

2.1 生命周期

与有名信号量和共享内存一样,POSIX 消息队列是 "一切皆文件" 的理念:

  • 挂载点:/dev/mqueue(tmpfs 文件系统)
  • 内核对象:每个队列对应一个 inode,存储在内存中
  • 用户态接口:mqd_t 本质是文件描述符(fd)

操作

POSIX 消息队列

创建

mq_open(O_CREAT)

标记删除

mq_unlink()(立即移除名称)

实际销毁

所有进程 mq_close() 后

进程崩溃

内核自动 mq_close()

残留风险

未 mq_unlink 会残留文件

创建者进程在 mq_open() 成功后应立即调用 mq_unlink(),确保资源自动回收。

同理,共享内存和信号量也应这样实现,unlink 仅切断新访问路径,不影响已有引用。资源销毁 = unlink 标记 + 引用计数归零

mqd_t mq = mq_open("/myq", O_CREAT | O_RDWR, 0664, &attr);
mq_unlink("/myq"); // 立即标记删除,但队列仍可用
// … 正常通信 …
mq_close(mq); // 最后一个 close 触发内核销毁

2.2 阻塞/非阻塞

// 初始创建为阻塞模式
mqd_t mq = mq_open("/myq", O_RDWR | O_CREAT, 0664, &attr);

// 运行时切换为非阻塞
struct mq_attr new_attr = {0}, old_attr;
new_attr.mq_flags = O_NONBLOCK;
mq_setattr(mq, &new_attr, &old_attr);

// 非阻塞接收(无消息立即返回 EAGAIN)
ssize_t n = mq_receive(mq, buf, size, NULL);
if (n == -1 && errno == EAGAIN) {
printf("无消息可用,继续其他工作…\\n");
}

2.3 优先级

2.3.1 原理

项目

说明

细节

优先级范围

0 到 MQ_PRIO_MAX – 1

默认 MQ_PRIO_MAX = 32768(即 0~32767)

数值

数值越大,优先级越高

prio=32767 是最高优先级,prio=0 是最低

接收顺序

总是返回当前最高优先级消息

与发送顺序无关,完全由优先级决定

同优先级

FIFO 顺序(先发送先接收)

内核红黑树中同优先级消息按时间戳排序

发送接口

mq_send(mqd, msg, len, prio)

prio 是独立参数,不占用消息体空间

接收接口

mq_receive(mqd, buf, size, &prio)

可选获取实际优先级值

消息队列红黑树(按 priority 降序排列):
[prio=10] ← 根节点(非最大值)
/ \\
[prio=5] [prio=15] ← 最右节点 = 最高优先级
/ \\ \\
[prio=3] [prio=7] [prio=20] ← mq_receive() 每次取此节点

接收流程:
1. mq_receive() → rb_last() → 找到 prio=20 节点
2. 删除该节点 → 重新平衡红黑树
3. 下次接收 → rb_last() → prio=15 节点

  • 插入(mq_send):O(log n)
  • 删除(mq_receive):O(log n)
  • 查找最大值:O(1)(通过 rb_last 直接定位最右节点)

2.3.2 避免优先级反转

// ❌ 反模式:高优先级消息依赖低优先级资源
mq_send(high_prio_mq, "需要锁A", 32767); // 高优先级消息
// 但处理该消息需等待被低优先级进程持有的锁 → 优先级反转

// ✅ 正确模式:资源隔离 + 专用队列
mqd_t emergency_mq = create_queue("/emergency", 10, 256); // 专用高优先级队列
mqd_t normal_mq = create_queue("/normal", 100, 256); // 普通队列

// 紧急消息走专用通道,不与普通消息竞争
if (is_emergency(cmd))
mq_send(emergency_mq, cmd, len, 32767);
else
mq_send(normal_mq, cmd, len, 10000);

原则:高优先级消息必须有独立资源通道,避免与低优先级消息共享队列导致隐式阻塞。

赞(0)
未经允许不得转载:网硕互联帮助中心 » 通信:(9) 消息队列与优先级反转
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!