一、目标:做一个“可用”的最小任务系统
我们要实现一个简化版任务系统,支持:
-
✅ 提交任务(任意函数)
-
✅ 工作线程自动执行
-
✅ 无任务时阻塞等待
-
✅ 支持安全关闭
-
✅ 正确释放线程
你可以理解为:
线程池的前身。
二、设计思路(系统取向)
我们需要三类能力:
1️⃣ 资源保护(队列)
- 任务队列
- 多线程访问
- 必须用 mutex 保护
2️⃣ 线程协作(等待任务)
- 队列空 → 线程休眠
- 新任务 → 唤醒线程
- 使用 condition_variable
3️⃣ 状态控制(停止系统)
- 停止标志
- 使用 atomic<bool>
三、整体结构图
主线程
│
├── submit() → push 任务 → notify
│
▼
工作线程
│
├── wait 等待任务
├── 取任务
├── unlock
├── 执行
└── 循环
四、完整实现(可直接运行)
1️⃣ 头文件
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <functional>
2️⃣ 任务系统类
class TaskSystem {
public:
TaskSystem(size_t threadCount)
: running_(true)
{
for (size_t i = 0; i < threadCount; ++i) {
workers_.emplace_back([this] {
this->workerLoop();
});
}
}
~TaskSystem() {
shutdown();
}
// 提交任务
void submit(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(mtx_);
tasks_.push(task);
}
cv_.notify_one();
}
// 关闭系统
void shutdown() {
running_.store(false);
cv_.notify_all();
for (auto& t : workers_) {
if (t.joinable()) {
t.join();
}
}
}
private:
void workerLoop() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] {
return !tasks_.empty() || !running_.load();
});
if (!running_.load() && tasks_.empty()) {
return;
}
task = tasks_.front();
tasks_.pop();
}
task(); // 在锁外执行任务
}
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
std::atomic<bool> running_;
};
五、测试代码
int main() {
TaskSystem pool(3);
for (int i = 0; i < 10; ++i) {
pool.submit([i] {
std::cout << "Task " << i
<< " running in thread "
<< std::this_thread::get_id()
<< std::endl;
});
}
std::this_thread::sleep_for(std::chrono::seconds(1));
pool.shutdown();
return 0;
}
输出类似:
Task 0 running in thread 1234
Task 1 running in thread 5678
…
六、关键点逐条拆解(非常重要)
1️⃣ 为什么 running_ 用 atomic?
因为:
- 多线程读写
- 只控制“状态”
- 不涉及复杂资源
符合 atomic 使用场景。
2️⃣ 为什么任务执行必须在锁外?
如果这样写:
task();
放在锁内:
- 所有线程会被阻塞
- 并发能力消失
原则:
锁保护资源,不保护业务逻辑
3️⃣ 为什么 wait 要带 predicate?
cv_.wait(lock, condition);
防止:
- 假唤醒
- 丢通知
4️⃣ 为什么 shutdown 要 notify_all?
因为:
- 可能多个线程在等待
- 必须全部唤醒退出
七、Java 类比
Java 线程池:
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(() -> {});
pool.shutdown();
Java 帮你封装了:
- 任务队列
- 锁
- 条件变量
- 状态控制
C++:
你自己写。
这就是 C++ 并发“系统级能力”的核心差异。
八、系统层面你已经掌握了什么?
现在你已经具备:
✔ 线程模型理解
谁共享数据?
✔ 资源保护
mutex + RAII
✔ 线程协作
condition_variable
✔ 状态控制
atomic
✔ 生命周期管理
join + shutdown
九、工程升级方向(后续可以做)
- 支持有界队列
- 支持优先级任务
- 支持 future
- 支持返回值
- 支持任务取消
十、本篇总结口诀
队列用 mutex 等待用 cv 状态用 atomic 生命周期用 join
十一、下一篇(最终篇)
第七篇我们讲:
并发排障与工程纪律
- 死锁四条件
- 锁顺序策略
- 竞态如何定位
- 性能如何优化
- 锁粒度怎么拆
- 工程规范 checklist
这才是“系统取向”的终章。
网硕互联帮助中心







评论前必须登录!
注册