云计算百科
云计算领域专业知识百科平台

C++ 多线程与并发系统取向(六)—— 任务系统骨架:mutex + condition_variable + atomic 组合实战

一、目标:做一个“可用”的最小任务系统

我们要实现一个简化版任务系统,支持:

  • ✅ 提交任务(任意函数)

  • ✅ 工作线程自动执行

  • ✅ 无任务时阻塞等待

  • ✅ 支持安全关闭

  • ✅ 正确释放线程

你可以理解为:

线程池的前身。

二、设计思路(系统取向)

我们需要三类能力:


1️⃣ 资源保护(队列)

  • 任务队列
  • 多线程访问
  • 必须用 mutex 保护

2️⃣ 线程协作(等待任务)

  • 队列空 → 线程休眠
  • 新任务 → 唤醒线程
  • 使用 condition_variable

3️⃣ 状态控制(停止系统)

  • 停止标志
  • 使用 atomic<bool>

三、整体结构图

主线程

├── submit() → push 任务 → notify


工作线程

├── wait 等待任务
├── 取任务
├── unlock
├── 执行
└── 循环

四、完整实现(可直接运行)

1️⃣ 头文件

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <functional>

2️⃣ 任务系统类

class TaskSystem {
public:
TaskSystem(size_t threadCount)
: running_(true)
{
for (size_t i = 0; i < threadCount; ++i) {
workers_.emplace_back([this] {
this->workerLoop();
});
}
}

~TaskSystem() {
shutdown();
}

// 提交任务
void submit(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(mtx_);
tasks_.push(task);
}
cv_.notify_one();
}

// 关闭系统
void shutdown() {
running_.store(false);
cv_.notify_all();

for (auto& t : workers_) {
if (t.joinable()) {
t.join();
}
}
}

private:
void workerLoop() {
while (true) {
std::function<void()> task;

{
std::unique_lock<std::mutex> lock(mtx_);

cv_.wait(lock, [this] {
return !tasks_.empty() || !running_.load();
});

if (!running_.load() && tasks_.empty()) {
return;
}

task = tasks_.front();
tasks_.pop();
}

task(); // 在锁外执行任务
}
}

private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
std::atomic<bool> running_;
};

五、测试代码

int main() {
TaskSystem pool(3);

for (int i = 0; i < 10; ++i) {
pool.submit([i] {
std::cout << "Task " << i
<< " running in thread "
<< std::this_thread::get_id()
<< std::endl;
});
}

std::this_thread::sleep_for(std::chrono::seconds(1));

pool.shutdown();

return 0;
}

输出类似:

Task 0 running in thread 1234
Task 1 running in thread 5678

六、关键点逐条拆解(非常重要)


1️⃣ 为什么 running_ 用 atomic?

因为:

  • 多线程读写
  • 只控制“状态”
  • 不涉及复杂资源

符合 atomic 使用场景。

2️⃣ 为什么任务执行必须在锁外?

如果这样写:

task();

放在锁内:

  • 所有线程会被阻塞
  • 并发能力消失

原则:

锁保护资源,不保护业务逻辑

3️⃣ 为什么 wait 要带 predicate?

cv_.wait(lock, condition);

防止:

  • 假唤醒
  • 丢通知

4️⃣ 为什么 shutdown 要 notify_all?

因为:

  • 可能多个线程在等待
  • 必须全部唤醒退出

七、Java 类比

Java 线程池:

ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(() -> {});
pool.shutdown();

Java 帮你封装了:

  • 任务队列
  • 条件变量
  • 状态控制

C++:

你自己写。

这就是 C++ 并发“系统级能力”的核心差异。

八、系统层面你已经掌握了什么?

现在你已经具备:

✔ 线程模型理解

谁共享数据?

✔ 资源保护

mutex + RAII

✔ 线程协作

condition_variable

✔ 状态控制

atomic

✔ 生命周期管理

join + shutdown

九、工程升级方向(后续可以做)

  • 支持有界队列
  • 支持优先级任务
  • 支持 future
  • 支持返回值
  • 支持任务取消

十、本篇总结口诀

队列用 mutex 等待用 cv 状态用 atomic 生命周期用 join

十一、下一篇(最终篇)

第七篇我们讲:

并发排障与工程纪律

  • 死锁四条件
  • 锁顺序策略
  • 竞态如何定位
  • 性能如何优化
  • 锁粒度怎么拆
  • 工程规范 checklist

这才是“系统取向”的终章。

赞(0)
未经允许不得转载:网硕互联帮助中心 » C++ 多线程与并发系统取向(六)—— 任务系统骨架:mutex + condition_variable + atomic 组合实战
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!