前言
在多线程编程的世界里,线程互斥解决了 “多个线程抢资源” 的问题,保证了临界资源的独占访问;但实际开发中,我们还需要解决 “线程间有序协作” 的问题 —— 比如生产者必须先生产,消费者才能消费;队列为空时消费者不能瞎忙活,队列为满时生产者不能硬塞数据。这种 “在保证数据安全的前提下,让线程按预定顺序执行” 的机制,就是线程同步。
如果说线程互斥是给临界资源装了 “防盗门”,那线程同步就是给线程协作装了 “交通信号灯”,让线程各司其职、互不耽误。本文作为线程同步的上篇,将从核心概念入手,深入拆解条件变量的原理与用法,再通过经典的生产者消费者模型完成实战落地。下面就让我们正式开始吧!
目录
前言
一、线程同步核心概念:搞懂 “协作” 的本质
1.1 什么是线程同步?
1.2 竞态条件:同步要解决的核心问题
1.3 同步的核心:等待与唤醒
二、条件变量:线程同步的 “等待 – 唤醒” 神器
2.1 条件变量的核心特性
2.2 条件变量的核心接口
2.2.1 初始化条件变量
方式 1:静态分配(推荐用于全局 / 静态变量)
方式 2:动态分配(用于局部 / 堆上变量)
2.2.2 销毁条件变量
2.2.3 等待条件满足(核心接口)
2.2.4 唤醒等待的线程
唤醒一个线程:pthread_cond_signal
唤醒所有线程:pthread_cond_broadcast
2.3 条件变量快速入门:简单的等待 – 唤醒案例
2.3.1 代码实现
2.3.2 编译运行
2.3.3 运行结果分析
2.4 为什么 pthread_cond_wait 必须配合互斥量?
2.4.1 核心原因 1:避免唤醒信号 “丢失”
2.4.2 核心原因 2:保证共享资源的访问安全
2.5 条件变量的使用规范:用 while 判断条件,而非 if
2.5.1 什么是伪唤醒?
2.5.2 正确的使用模板(必记)
模板 1:等待条件的线程(如消费者)
模板 2:发送信号的线程(如生产者)
2.6 条件变量的 C++ 封装:RAII 风格更优雅
2.6.1 封装代码:Cond.hpp
2.6.2 封装代码解析
2.6.3 封装后的使用示例
三、生产者消费者模型:线程同步的经典实战
3.1 生产者消费者模型的 “321” 原则(必记)
3 种关系详解:
3.2 为什么需要生产者消费者模型?
3.3 基于阻塞队列的生产者消费者模型(条件变量实现)
3.3.1 实现思路
3.3.2 阻塞队列实现:BlockQueue.hpp(模板类)
3.3.3 生产者消费者模型测试代码:main.cpp
3.3.4 编译运行与结果分析
3.4 模型扩展:支持自定义任务类型
3.4.1 自定义任务类型:Task.hpp
3.4.2 测试代码:main_task.cpp
3.4.3 运行结果
四、常见问题与避坑指南
4.1 坑 1:用 if 判断条件,而非 while
4.2 坑 2:忘记解锁或解锁时机错误
4.3 坑 3:唤醒函数使用不当
4.4 坑 4:队列容量设计不合理
4.5 坑 5:线程退出机制缺失
总结
一、线程同步核心概念:搞懂 “协作” 的本质
在学习具体工具之前,我们必须先理清几个核心概念,这是理解后续所有内容的基础。这些概念看似抽象,但结合生活场景一看就懂。
1.1 什么是线程同步?
线程同步:在保证数据安全(互斥)的前提下,让多个线程按照预先定义的顺序访问临界资源,从而避免线程饥饿、无意义轮询等问题,实现线程间的高效协作。
同步的核心是 “顺序”,互斥的核心是 “独占”;同步是在互斥的基础上实现的 —— 没有互斥的同步会导致数据竞争,没有同步的互斥只能保证数据安全,却无法实现线程间的有效协作。
生活案例:餐厅里的厨师(生产者)和服务员(消费者)。厨师做好菜(生产数据)后,服务员才能端走(消费数据);如果菜还没做好,服务员只能等待,不能空着手上菜;如果出菜口满了,厨师只能等待,不能把菜堆在地上。这里的 “出菜口” 就是临界资源,“先做菜后上菜” 的规则就是同步。
1.2 竞态条件:同步要解决的核心问题
竞态条件:由于线程执行的时序不确定,导致程序执行结果不符合预期的问题,这是线程同步需要解决的核心矛盾。
简单来说,就是 “线程执行顺序乱了,结果就错了”。比如:
- 消费者线程先执行,发现队列为空,却没有等待机制,只能不断轮询 “有没有数据”,浪费 CPU 资源;
- 生产者线程生产速度太快,队列满了还在继续生产,导致数据溢出;
- 多个线程同时修改共享的 “等待条件”(比如队列大小),导致条件判断出错。
竞态条件的本质是 “共享资源的状态变化与线程执行顺序不匹配”,而线程同步就是通过 “等待 – 唤醒” 机制,让线程的执行顺序适配共享资源的状态变化。
1.3 同步的核心:等待与唤醒
线程同步的本质的是解决 “等待 – 唤醒” 问题:
- 当线程的执行条件不满足时(如队列为空、队列满),让线程进入阻塞等待状态,主动释放 CPU 资源,避免无意义轮询;
- 当其他线程修改了共享资源,使条件满足时(如生产者生产了数据、消费者消费了数据),由该线程唤醒等待的线程,让其继续执行。
在 Linux 下,实现 “等待 – 唤醒” 的核心工具是条件变量(pthread_cond_t),它必须与互斥量配合使用,才能安全地实现线程同步。
二、条件变量:线程同步的 “等待 – 唤醒” 神器
条件变量是 POSIX 线程库提供的核心同步工具,专门用于实现线程间的 “等待 – 唤醒” 机制。它的核心思想是:线程在条件不满足时,释放持有的互斥量,进入阻塞等待;当其他线程修改了共享资源,使条件满足时,唤醒等待在该条件变量上的线程,让其重新竞争互斥量并继续执行。
2.1 条件变量的核心特性
条件变量本身不具备互斥功能,它必须与互斥量配合使用,原因有二:
条件变量的核心价值在于:让线程在条件不满足时主动放弃 CPU,而不是无意义地轮询,从而提高 CPU 利用率。
2.2 条件变量的核心接口
条件变量的核心类型是pthread_cond_t,所有操作都围绕这个类型展开,核心接口包括初始化、销毁、等待、唤醒,均在<pthread.h>头文件中声明。
2.2.1 初始化条件变量
和互斥量类似,条件变量的初始化有静态分配和动态分配两种方式,适用于不同场景。
方式 1:静态分配(推荐用于全局 / 静态变量)
使用宏PTHREAD_COND_INITIALIZER直接初始化,简单高效,无需手动销毁:
// 静态初始化条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 配合静态初始化的互斥量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
方式 2:动态分配(用于局部 / 堆上变量)
使用pthread_cond_init函数初始化,支持自定义属性,使用后需要手动销毁:
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
- 参数说明:
- cond:指向要初始化的条件变量对象的指针;
- attr:条件变量的属性,一般设为NULL(使用默认属性);
- 返回值:成功返回 0,失败返回对应的错误号(如EINVAL表示参数无效)。
2.2.2 销毁条件变量
条件变量使用完成后需要销毁,释放系统资源,核心函数是pthread_cond_destroy:
int pthread_cond_destroy(pthread_cond_t *cond);
- 参数说明:cond指向要销毁的条件变量对象;
- 返回值:成功返回 0,失败返回错误号;
- 注意事项:
- 静态初始化的条件变量(PTHREAD_COND_INITIALIZER)无需手动销毁;
- 不要销毁有线程正在等待的条件变量,否则会导致未定义行为;
- 已经销毁的条件变量,不能再被其他线程使用。
2.2.3 等待条件满足(核心接口)
让线程在指定的条件变量上阻塞等待,直到被唤醒,核心函数是pthread_cond_wait:
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
- 参数说明:
- cond:线程要等待的条件变量;
- mutex:线程当前持有的互斥量;
- 返回值:成功返回 0,失败返回错误号;
- 三个关键特性(必须牢记):
- 自动解锁:调用函数时,线程会自动释放持有的互斥量,让其他线程有机会修改共享资源;
- 阻塞等待:线程进入阻塞状态,不占用 CPU 资源,直到被其他线程唤醒;
- 重新加锁:线程被唤醒后,会重新竞争互斥量,只有竞争成功后,才会从pthread_cond_wait返回,继续执行后续代码。
简单来说,pthread_cond_wait完成了 “解锁→等待→被唤醒→重新加锁” 的原子操作,这是保证同步安全的核心。
2.2.4 唤醒等待的线程
POSIX 线程库提供了两个唤醒函数,分别用于唤醒一个线程和所有线程,满足不同的同步场景。
唤醒一个线程:pthread_cond_signal
int pthread_cond_signal(pthread_cond_t *cond);
- 功能:唤醒等待在该条件变量上的任意一个线程(由操作系统调度选择),其他线程仍处于阻塞状态;
- 适用场景:一对一同步(如单生产者单消费者),避免不必要的线程唤醒,提高效率。
唤醒所有线程:pthread_cond_broadcast
int pthread_cond_broadcast(pthread_cond_t *cond);
- 功能:唤醒等待在该条件变量上的所有线程,让它们重新竞争互斥量;
- 适用场景:一对多或多对多同步(如多生产者多消费者),确保所有等待的线程都能收到信号。
2.3 条件变量快速入门:简单的等待 – 唤醒案例
为了让大家快速理解条件变量的使用,我们写一个简单的案例:创建 2 个等待线程,主线程每隔 1 秒唤醒它们,分别测试signal和broadcast的效果。
2.3.1 代码实现
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
// 静态初始化互斥量和条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 线程执行函数:等待条件变量被唤醒
void *wait_thread(void *arg) {
char *thread_name = static_cast<char*>(arg);
while (true) {
// 1. 加锁:访问共享资源前必须保证互斥
pthread_mutex_lock(&mutex);
// 2. 等待条件变量:自动释放互斥量,被唤醒后重新加锁
std::cout << thread_name << ":条件不满足,进入等待…" << std::endl;
pthread_cond_wait(&cond, &mutex);
// 3. 被唤醒后执行:此时已重新获得互斥量
std::cout << thread_name << ":被唤醒,执行任务…" << std::endl;
// 4. 解锁:释放临界资源
pthread_mutex_unlock(&mutex);
// 模拟任务执行耗时
sleep(1);
}
return nullptr;
}
int main(void) {
pthread_t t1, t2;
// 创建两个等待线程
pthread_create(&t1, NULL, wait_thread, (void*)"线程A");
pthread_create(&t2, NULL, wait_thread, (void*)"线程B");
// 主线程:每隔1秒唤醒等待的线程
sleep(2); // 确保两个线程都已启动并进入等待
int count = 0;
while (count < 3) {
std::cout << "\\n主线程:准备唤醒等待的线程(第" << count+1 << "次)" << std::endl;
// 测试1:唤醒一个线程(pthread_cond_signal)
// pthread_cond_signal(&cond);
// 测试2:唤醒所有线程(pthread_cond_broadcast)
pthread_cond_broadcast(&cond);
sleep(2);
count++;
}
// 销毁资源(实际不会执行到,这里仅作演示)
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
return 0;
}
2.3.2 编译运行
# 编译:链接pthread库(多线程编程必备)
g++ cond_demo.cpp -o cond_demo -lpthread
# 运行
./cond_demo
2.3.3 运行结果分析
-
使用 pthread_cond_signal 时:每次只有一个线程被唤醒(线程 A 和线程 B 交替执行),输出如下:
线程A:条件不满足,进入等待…
线程B:条件不满足,进入等待…主线程:准备唤醒等待的线程(第1次)
线程A:被唤醒,执行任务…
线程B:条件不满足,进入等待…主线程:准备唤醒等待的线程(第2次)
线程B:被唤醒,执行任务…
线程A:条件不满足,进入等待… -
使用 pthread_cond_broadcast 时:每次两个线程都被唤醒,输出如下:
线程A:条件不满足,进入等待…
线程B:条件不满足,进入等待…主线程:准备唤醒等待的线程(第1次)
线程A:被唤醒,执行任务…
线程B:被唤醒,执行任务…主线程:准备唤醒等待的线程(第2次)
线程A:条件不满足,进入等待…
线程B:条件不满足,进入等待…
线程A:被唤醒,执行任务…
线程B:被唤醒,执行任务…
这个案例清晰地展示了条件变量的 “等待 – 唤醒” 机制,核心要点是:等待前必须加锁,唤醒后必须解锁,且pthread_cond_wait必须与互斥量配合使用。
2.4 为什么 pthread_cond_wait 必须配合互斥量?
这是面试和开发中的高频问题,也是理解条件变量的关键。很多人会疑惑:为什么不能先解锁,再调用pthread_cond_wait?我们从两个核心角度解答。
2.4.1 核心原因 1:避免唤醒信号 “丢失”
如果手动设计 “解锁→等待” 的逻辑(错误写法):
// 错误示例:解锁和等待不是原子操作
pthread_mutex_lock(&mutex);
while (condition == false) {
pthread_mutex_unlock(&mutex); // 手动解锁
pthread_cond_wait(&cond, NULL); // 无互斥量的等待
pthread_mutex_lock(&mutex); // 重新加锁
}
pthread_mutex_unlock(&mutex);
这种写法存在致命问题:解锁和等待不是原子操作。在pthread_mutex_unlock和pthread_cond_wait之间,可能有其他线程修改了共享资源,使条件满足并调用了pthread_cond_signal;但此时当前线程还未进入等待状态,这个唤醒信号会被永久错过,导致线程永远阻塞在pthread_cond_wait中。
而pthread_cond_wait将 “解锁” 和 “等待” 封装成原子操作,从根本上避免了信号丢失的问题。
2.4.2 核心原因 2:保证共享资源的访问安全
条件变量的等待条件(如队列是否为空、是否满)是基于共享资源的,而共享资源的访问必须保证互斥。如果没有互斥量,多个线程可能同时检查和修改等待条件,导致竞态条件。
比如:多个消费者线程同时检查 “队列是否为空”,如果没有互斥量,可能多个线程都判断队列为空并进入等待;当生产者生产一个数据后,调用signal唤醒一个线程,其他线程仍会永远等待,这就是典型的同步安全问题。
总结:互斥量保证了共享资源(等待条件)的访问安全,pthread_cond_wait的原子操作避免了唤醒信号的丢失,二者缺一不可。
2.5 条件变量的使用规范:用 while 判断条件,而非 if
在使用条件变量时,必须用while循环判断等待条件,而不是if判断,这是避免 “伪唤醒” 的关键,也是开发中的高频坑。
2.5.1 什么是伪唤醒?
伪唤醒:线程在没有被其他线程调用signal或broadcast的情况下,从pthread_cond_wait中被唤醒。这是操作系统的底层实现导致的(如信号中断、线程调度优化),是条件变量的固有特性,无法避免。
如果用if判断条件,伪唤醒会导致线程在条件不满足时继续执行,从而引发程序异常;而用while循环判断,即使发生伪唤醒,线程会重新检查条件,若条件不满足则再次进入等待,从根本上避免了伪唤醒的影响。
2.5.2 正确的使用模板(必记)
条件变量的使用有严格的规范,分为 “等待条件” 和 “发送信号” 两部分,这是通用的安全模板,无论什么场景都可以直接套用:
模板 1:等待条件的线程(如消费者)
// 1. 加锁,保证共享资源的互斥访问
pthread_mutex_lock(&mutex);
// 2. while循环判断条件,避免伪唤醒
while (条件不满足) { // 如:队列为空
pthread_cond_wait(&cond, &mutex); // 等待并自动解锁
}
// 3. 条件满足,操作临界资源
// … 业务逻辑(如从队列取数据) …
// 4. 解锁,释放临界资源
pthread_mutex_unlock(&mutex);
模板 2:发送信号的线程(如生产者)
// 1. 加锁,保证共享资源的互斥访问
pthread_mutex_lock(&mutex);
// 2. 修改共享资源,使等待条件满足
// … 业务逻辑(如向队列加数据) …
// 3. 唤醒等待的线程
pthread_cond_signal(&cond); // 或pthread_cond_broadcast
// 4. 解锁,让被唤醒的线程重新竞争互斥量
pthread_mutex_unlock(&mutex);
核心要点:
2.6 条件变量的 C++ 封装:RAII 风格更优雅
在 C 语言中,使用条件变量需要手动调用初始化、销毁、等待、唤醒函数,容易出现遗漏;而在 C++ 中,我们可以利用RAII(资源获取即初始化) 思想对条件变量进行封装,结合之前封装的互斥量,让代码更安全、更优雅,同时保证可复用性。
我们将封装的代码写在Cond.hpp头文件中,依赖之前封装的Lock.hpp(包含Mutex和LockGuard类)。
2.6.1 封装代码:Cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <cassert>
#include "Lock.hpp" // 引入之前封装的互斥量
// 命名空间,避免命名冲突
namespace CondModule {
using namespace LockModule; // 复用互斥量的封装
class Cond {
public:
// 禁止拷贝和赋值(条件变量不能被拷贝)
Cond(const Cond&) = delete;
Cond& operator=(const Cond&) = delete;
// 构造函数:动态初始化条件变量
Cond() {
int ret = pthread_cond_init(&_cond, nullptr);
assert(ret == 0); // 调试阶段断言,release版本自动失效
(void)ret; // 避免编译警告
}
// 等待条件变量:接收自定义的Mutex对象
void Wait(Mutex& mutex) {
// 获取原生的pthread_mutex_t指针,与底层接口兼容
int ret = pthread_cond_wait(&_cond, mutex.GetMutexOriginal());
assert(ret == 0);
(void)ret;
}
// 唤醒一个等待的线程
void Notify() {
int ret = pthread_cond_signal(&_cond);
assert(ret == 0);
(void)ret;
}
// 唤醒所有等待的线程
void NotifyAll() {
int ret = pthread_cond_broadcast(&_cond);
assert(ret == 0);
(void)ret;
}
// 析构函数:销毁条件变量
~Cond() {
int ret = pthread_cond_destroy(&_cond);
assert(ret == 0);
(void)ret;
}
private:
pthread_cond_t _cond; // 原生的条件变量对象
};
}
2.6.2 封装代码解析
2.6.3 封装后的使用示例
结合Lock.hpp和Cond.hpp,重写之前的等待 – 唤醒案例,代码更简洁安全:
#include <iostream>
#include <unistd.h>
#include "Lock.hpp"
#include "Cond.hpp"
using namespace LockModule;
using namespace CondModule;
// 自定义的互斥量和条件变量
Mutex mutex;
Cond cond;
void *wait_thread(void *arg) {
char *thread_name = static_cast<char*>(arg);
while (true) {
LockGuard lock(mutex); // RAII加锁,自动解锁
std::cout << thread_name << ":条件不满足,进入等待…" << std::endl;
cond.Wait(mutex); // 等待条件变量
std::cout << thread_name << ":被唤醒,执行任务…" << std::endl;
sleep(1);
}
return nullptr;
}
int main(void) {
pthread_t t1, t2;
pthread_create(&t1, NULL, wait_thread, (void*)"线程A");
pthread_create(&t2, NULL, wait_thread, (void*)"线程B");
sleep(2);
int count = 0;
while (count < 3) {
std::cout << "\\n主线程:准备唤醒等待的线程(第" << count+1 << "次)" << std::endl;
cond.NotifyAll(); // 唤醒所有线程
sleep(2);
count++;
}
return 0;
}
可以看到,封装后的代码无需手动加锁 / 解锁、初始化 / 销毁资源,极大地降低了出错概率,这也是 C++ 封装的核心优势。
三、生产者消费者模型:线程同步的经典实战
生产者消费者模型是线程同步的经典应用场景,几乎所有多线程协作框架(如线程池、消息队列)都基于这个模型实现。它通过一个缓冲区(如队列) 将生产者线程和消费者线程解耦,让二者各司其职、有序协作,是解决多线程协作问题的 “万能模板”。
3.1 生产者消费者模型的 “321” 原则(必记)
为了方便记忆和理解,生产者消费者模型可以总结为 “321” 原则,这是模型的核心精髓:
- 3 种关系:生产者与生产者(互斥)、消费者与消费者(互斥)、生产者与消费者(互斥 + 同步);
- 2 类角色:生产者线程(生产数据,放入缓冲区)、消费者线程(从缓冲区取数据,消费数据);
- 1 个缓冲区:作为生产者和消费者的中间介质,解耦二者,平衡生产和消费的速度。

3 种关系详解:
- 互斥:生产者和消费者不能同时操作缓冲区(保证数据安全);
- 同步:生产者生产数据后,需要通知消费者;消费者消费数据后,需要通知生产者。
3.2 为什么需要生产者消费者模型?
在没有生产者消费者模型的情况下,生产者和消费者直接交互,会存在以下问题:
而生产者消费者模型通过缓冲区解决了这些问题:
3.3 基于阻塞队列的生产者消费者模型(条件变量实现)
阻塞队列(BlockingQueue) 是实现生产者消费者模型的最佳载体,它是一种特殊的队列,具有以下特性:
- 当队列为空时,消费者从队列取数据的操作会阻塞,直到队列中有数据;
- 当队列为满时,生产者向队列存数据的操作会阻塞,直到队列中有空闲位置。

阻塞队列的阻塞特性正好契合了生产者消费者的同步需求,我们使用 “条件变量 + 互斥量” 实现阻塞队列,并基于此实现多生产者多消费者模型。
3.3.1 实现思路
- _prod_cond:生产者条件变量,队列为满时,生产者阻塞等待;
- _cons_cond:消费者条件变量,队列为空时,消费者阻塞等待;
- Enqueue:生产者接口,向队列中添加数据,队满时阻塞;
- Dequeue:消费者接口,从队列中取出数据,队空时阻塞。
3.3.2 阻塞队列实现:BlockQueue.hpp(模板类)
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <queue>
#include <pthread.h>
#include <cassert>
#include <unistd.h>
// 模板类,支持任意类型的元素(如int、自定义任务对象)
template <typename T>
class BlockQueue {
private:
// 判满:队列大小等于最大容量
bool IsFull() const {
return _queue.size() == _capacity;
}
// 判空:队列为空
bool IsEmpty() const {
return _queue.empty();
}
public:
// 构造函数:初始化队列容量和同步资源
BlockQueue(int capacity)
: _capacity(capacity), _prod_wait_cnt(0), _cons_wait_cnt(0) {
// 初始化互斥量和条件变量
int ret = pthread_mutex_init(&_mutex, nullptr);
assert(ret == 0);
ret = pthread_cond_init(&_prod_cond, nullptr);
assert(ret == 0);
ret = pthread_cond_init(&_cons_cond, nullptr);
assert(ret == 0);
(void)ret; // 避免编译警告
}
// 析构函数:销毁同步资源
~BlockQueue() {
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_prod_cond);
pthread_cond_destroy(&_cons_cond);
}
// 生产者接口:入队,队满时阻塞
void Enqueue(const T& data) {
// 1. 加锁,保证队列操作的互斥性
pthread_mutex_lock(&_mutex);
// 2. while循环判满,避免伪唤醒
while (IsFull()) {
_prod_wait_cnt++;
std::cout << "队列满(容量:" << _capacity << "),生产者[" << pthread_self() << "]进入等待…" << std::endl;
pthread_cond_wait(&_prod_cond, &_mutex); // 等待并自动解锁
_prod_wait_cnt–;
}
// 3. 队未满,生产数据(入队)
_queue.push(data);
std::cout << "生产者[" << pthread_self() << "]生产数据:" << data
<< ",队列当前大小:" << _queue.size() << std::endl;
// 4. 唤醒等待的消费者(如果有)
if (_cons_wait_cnt > 0) {
std::cout << "唤醒一个等待的消费者" << std::endl;
pthread_cond_signal(&_cons_cond);
}
// 5. 解锁
pthread_mutex_unlock(&_mutex);
}
// 消费者接口:出队,队空时阻塞
void Dequeue(T& data) {
// 1. 加锁,保证队列操作的互斥性
pthread_mutex_lock(&_mutex);
// 2. while循环判空,避免伪唤醒
while (IsEmpty()) {
_cons_wait_cnt++;
std::cout << "队列为空,消费者[" << pthread_self() << "]进入等待…" << std::endl;
pthread_cond_wait(&_cons_cond, &_mutex); // 等待并自动解锁
_cons_wait_cnt–;
}
// 3. 队非空,消费数据(出队)
data = _queue.front();
_queue.pop();
std::cout << "消费者[" << pthread_self() << "]消费数据:" << data
<< ",队列当前大小:" << _queue.size() << std::endl;
// 4. 唤醒等待的生产者(如果有)
if (_prod_wait_cnt > 0) {
std::cout << "唤醒一个等待的生产者" << std::endl;
pthread_cond_signal(&_prod_cond);
}
// 5. 解锁
pthread_mutex_unlock(&_mutex);
}
private:
std::queue<T> _queue; // 底层缓冲区:队列
int _capacity; // 队列最大容量
pthread_mutex_t _mutex; // 保护队列的互斥量
pthread_cond_t _prod_cond; // 生产者条件变量
pthread_cond_t _cons_cond; // 消费者条件变量
int _prod_wait_cnt; // 等待的生产者数量
int _cons_wait_cnt; // 等待的消费者数量
};
#endif // __BLOCK_QUEUE_HPP__
3.3.3 生产者消费者模型测试代码:main.cpp
我们创建 2 个生产者线程和 3 个消费者线程,测试阻塞队列的功能,生产者每隔 1 秒生产一个整数,消费者每隔 2 秒消费一个整数:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "BlockQueue.hpp"
// 阻塞队列:容量为5,存储int类型数据
BlockQueue<int> g_block_queue(5);
// 生产者线程执行函数
void *producer_thread(void *arg) {
int producer_id = *(static_cast<int*>(arg));
delete static_cast<int*>(arg); // 释放动态分配的参数
int data = 0;
while (true) {
// 生产数据:简单递增的整数
data++;
g_block_queue.Enqueue(data);
// 模拟生产耗时
sleep(1);
}
return nullptr;
}
// 消费者线程执行函数
void *consumer_thread(void *arg) {
int consumer_id = *(static_cast<int*>(arg));
delete static_cast<int*>(arg); // 释放动态分配的参数
while (true) {
int data = 0;
g_block_queue.Dequeue(data);
// 模拟消费耗时
sleep(2);
}
return nullptr;
}
int main(void) {
std::cout << "生产者消费者模型启动(2生产者,3消费者,队列容量5)" << std::endl;
// 创建2个生产者线程
pthread_t prod_tids[2];
for (int i = 0; i < 2; ++i) {
int *id = new int(i+1);
pthread_create(&prod_tids[i], NULL, producer_thread, static_cast<void*>(id));
}
// 创建3个消费者线程
pthread_t cons_tids[3];
for (int i = 0; i < 3; ++i) {
int *id = new int(i+1);
pthread_create(&cons_tids[i], NULL, consumer_thread, static_cast<void*>(id));
}
// 等待线程执行(实际不会退出,这里仅作演示)
for (int i = 0; i < 2; ++i) {
pthread_join(prod_tids[i], NULL);
}
for (int i = 0; i < 3; ++i) {
pthread_join(cons_tids[i], NULL);
}
return 0;
}
3.3.4 编译运行与结果分析
# 编译:链接pthread库
g++ main.cpp -o prod_cons -lpthread
# 运行
./prod_cons
运行结果(关键部分):
生产者消费者模型启动(2生产者,3消费者,队列容量5)
生产者[140709348478720]生产数据:1,队列当前大小:1
唤醒一个等待的消费者
消费者[140709331693312]消费数据:1,队列当前大小:0
唤醒一个等待的生产者
生产者[140709340086016]生产数据:1,队列当前大小:1
唤醒一个等待的消费者
生产者[140709348478720]生产数据:2,队列当前大小:2
生产者[140709340086016]生产数据:2,队列当前大小:3
生产者[140709348478720]生产数据:3,队列当前大小:4
生产者[140709340086016]生产数据:3,队列当前大小:5
生产者[140709348478720]生产数据:4,队列当前大小:5
队列满(容量:5),生产者[140709348478720]进入等待…
消费者[140709323300608]消费数据:2,队列当前大小:4
唤醒一个等待的生产者
生产者[140709348478720]生产数据:4,队列当前大小:5
队列满(容量:5),生产者[140709348478720]进入等待…
结果分析:
3.4 模型扩展:支持自定义任务类型
阻塞队列的模板设计支持任意类型的数据,除了整数,我们还可以定义自定义任务类型(如计算任务、IO 任务),让生产者消费者模型更贴近实际开发场景。
3.4.1 自定义任务类型:Task.hpp
#pragma once
#include <iostream>
#include <functional>
// 任务类型:支持任意无参无返回值的函数
using Task = std::function<void()>;
// 示例任务:加法计算
void AddTask(int a, int b) {
int result = a + b;
std::cout << "任务执行:" << a << " + " << b << " = " << result << std::endl;
}
// 示例任务:字符串打印
void PrintTask(const std::string& msg) {
std::cout << "任务执行:" << msg << std::endl;
}
3.4.2 测试代码:main_task.cpp
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
// 阻塞队列:存储Task类型
BlockQueue<Task> g_task_queue(3);
// 生产者线程:生产任务
void *task_producer(void *arg) {
int producer_id = *(static_cast<int*>(arg));
delete static_cast<int*>(arg);
int count = 0;
while (true) {
// 生产不同的任务
if (count % 2 == 0) {
// 加法任务
Task task = std::bind(AddTask, count, count+1);
g_task_queue.Enqueue(task);
std::cout << "生产者[" << producer_id << "]生产加法任务:" << count << " + " << count+1 << std::endl;
} else {
// 打印任务
std::string msg = "Hello, Producer " + std::to_string(producer_id) + ", Task " + std::to_string(count);
Task task = std::bind(PrintTask, msg);
g_task_queue.Enqueue(task);
std::cout << "生产者[" << producer_id << "]生产打印任务:" << msg << std::endl;
}
count++;
sleep(1);
}
return nullptr;
}
// 消费者线程:消费任务
void *task_consumer(void *arg) {
int consumer_id = *(static_cast<int*>(arg));
delete static_cast<int*>(arg);
while (true) {
Task task;
g_task_queue.Dequeue(task);
// 执行任务
std::cout << "消费者[" << consumer_id << "]开始执行任务" << std::endl;
task();
sleep(2);
}
return nullptr;
}
int main(void) {
std::cout << "任务型生产者消费者模型启动(2生产者,2消费者,队列容量3)" << std::endl;
// 创建2个生产者线程
pthread_t prod_tids[2];
for (int i = 0; i < 2; ++i) {
int *id = new int(i+1);
pthread_create(&prod_tids[i], NULL, task_producer, static_cast<void*>(id));
}
// 创建2个消费者线程
pthread_t cons_tids[2];
for (int i = 0; i < 2; ++i) {
int *id = new int(i+1);
pthread_create(&cons_tids[i], NULL, task_consumer, static_cast<void*>(id));
}
// 等待线程执行
for (int i = 0; i < 2; ++i) {
pthread_join(prod_tids[i], NULL);
pthread_join(cons_tids[i], NULL);
}
return 0;
}
3.4.3 运行结果
任务型生产者消费者模型启动(2生产者,2消费者,队列容量3)
生产者[1]生产加法任务:0 + 1
唤醒一个等待的消费者
消费者[1]开始执行任务
任务执行:0 + 1 = 1
生产者[2]生产打印任务:Hello, Producer 2, Task 0
生产者[1]生产打印任务:Hello, Producer 1, Task 1
生产者[2]生产加法任务:1 + 2
生产者[1]生产加法任务:2 + 3
队列满(容量:3),生产者[1]进入等待…
消费者[2]开始执行任务
任务执行:Hello, Producer 2, Task 0
唤醒一个等待的生产者
生产者[1]生产打印任务:Hello, Producer 1, Task 3
队列满(容量:3),生产者[1]进入等待…
这个扩展案例展示了生产者消费者模型的灵活性,通过自定义任务类型,可以将其应用于各种实际场景(如 Web 服务器的请求处理、后台任务调度等)。
四、常见问题与避坑指南
在使用条件变量和生产者消费者模型时,容易遇到一些问题,这里总结了高频坑和避坑技巧,帮助你写出更健壮的代码。
4.1 坑 1:用 if 判断条件,而非 while
问题:使用if判断等待条件,遇到伪唤醒时,线程会在条件不满足的情况下继续执行,导致数据错误;
解决:必须用while循环判断条件,即使被伪唤醒,也会重新检查条件,不满足则继续等待。
4.2 坑 2:忘记解锁或解锁时机错误
问题:加锁后忘记解锁,或在条件判断前解锁,导致死锁或数据竞争;
解决:
4.3 坑 3:唤醒函数使用不当
问题:在多生产者多消费者场景中,使用signal唤醒一个线程,导致部分线程永远等待;解决:
4.4 坑 4:队列容量设计不合理
问题:队列容量过大导致内存浪费,过小导致生产者频繁等待;
解决:根据实际场景调整队列容量,一般设置为 CPU 核心数的 2~4 倍,或根据生产消费速度动态调整。
4.5 坑 5:线程退出机制缺失
问题:生产者消费者线程进入无限循环,无法正常退出;
解决:
总结
本文作为线程同步的上篇,从核心概念出发,深入讲解了条件变量的原理与用法,再通过生产者消费者模型完成了实战落地。在下一篇文章中,我们将继续深入线程同步。
如果本文对你有帮助,欢迎点赞、收藏、关注!如果在学习过程中遇到了问题,或者有更好的实战案例,欢迎在评论区留言交流!
网硕互联帮助中心




评论前必须登录!
注册