云计算百科
云计算领域专业知识百科平台

【Linux系统编程】(四十三)线程同步上篇:从条件变量到生产消费模型,吃透多线程协作精髓


前言

        在多线程编程的世界里,线程互斥解决了 “多个线程抢资源” 的问题,保证了临界资源的独占访问;但实际开发中,我们还需要解决 “线程间有序协作” 的问题 —— 比如生产者必须先生产,消费者才能消费;队列为空时消费者不能瞎忙活,队列为满时生产者不能硬塞数据。这种 “在保证数据安全的前提下,让线程按预定顺序执行” 的机制,就是线程同步。

        如果说线程互斥是给临界资源装了 “防盗门”,那线程同步就是给线程协作装了 “交通信号灯”,让线程各司其职、互不耽误。本文作为线程同步的上篇,将从核心概念入手,深入拆解条件变量的原理与用法,再通过经典的生产者消费者模型完成实战落地。下面就让我们正式开始吧!       


目录

前言

一、线程同步核心概念:搞懂 “协作” 的本质

1.1 什么是线程同步?

1.2 竞态条件:同步要解决的核心问题

1.3 同步的核心:等待与唤醒

二、条件变量:线程同步的 “等待 – 唤醒” 神器

2.1 条件变量的核心特性

2.2 条件变量的核心接口

2.2.1 初始化条件变量

方式 1:静态分配(推荐用于全局 / 静态变量)

方式 2:动态分配(用于局部 / 堆上变量)

2.2.2 销毁条件变量

2.2.3 等待条件满足(核心接口)

2.2.4 唤醒等待的线程

唤醒一个线程:pthread_cond_signal

唤醒所有线程:pthread_cond_broadcast

2.3 条件变量快速入门:简单的等待 – 唤醒案例

2.3.1 代码实现

2.3.2 编译运行

2.3.3 运行结果分析

2.4 为什么 pthread_cond_wait 必须配合互斥量?

2.4.1 核心原因 1:避免唤醒信号 “丢失”

2.4.2 核心原因 2:保证共享资源的访问安全

2.5 条件变量的使用规范:用 while 判断条件,而非 if

2.5.1 什么是伪唤醒?

2.5.2 正确的使用模板(必记)

模板 1:等待条件的线程(如消费者)

模板 2:发送信号的线程(如生产者)

2.6 条件变量的 C++ 封装:RAII 风格更优雅

2.6.1 封装代码:Cond.hpp

2.6.2 封装代码解析

2.6.3 封装后的使用示例

三、生产者消费者模型:线程同步的经典实战

3.1 生产者消费者模型的 “321” 原则(必记)

3 种关系详解:

3.2 为什么需要生产者消费者模型?

3.3 基于阻塞队列的生产者消费者模型(条件变量实现)

3.3.1 实现思路

3.3.2 阻塞队列实现:BlockQueue.hpp(模板类)

3.3.3 生产者消费者模型测试代码:main.cpp

3.3.4 编译运行与结果分析

3.4 模型扩展:支持自定义任务类型

3.4.1 自定义任务类型:Task.hpp

3.4.2 测试代码:main_task.cpp

3.4.3 运行结果

四、常见问题与避坑指南

4.1 坑 1:用 if 判断条件,而非 while

4.2 坑 2:忘记解锁或解锁时机错误

4.3 坑 3:唤醒函数使用不当

4.4 坑 4:队列容量设计不合理

4.5 坑 5:线程退出机制缺失

总结


一、线程同步核心概念:搞懂 “协作” 的本质

        在学习具体工具之前,我们必须先理清几个核心概念,这是理解后续所有内容的基础。这些概念看似抽象,但结合生活场景一看就懂。

1.1 什么是线程同步?

        线程同步:在保证数据安全(互斥)的前提下,让多个线程按照预先定义的顺序访问临界资源,从而避免线程饥饿、无意义轮询等问题,实现线程间的高效协作。

        同步的核心是 “顺序”,互斥的核心是 “独占”;同步是在互斥的基础上实现的 —— 没有互斥的同步会导致数据竞争,没有同步的互斥只能保证数据安全,却无法实现线程间的有效协作。

        生活案例:餐厅里的厨师(生产者)和服务员(消费者)。厨师做好菜(生产数据)后,服务员才能端走(消费数据);如果菜还没做好,服务员只能等待,不能空着手上菜;如果出菜口满了,厨师只能等待,不能把菜堆在地上。这里的 “出菜口” 就是临界资源,“先做菜后上菜” 的规则就是同步。

1.2 竞态条件:同步要解决的核心问题

        竞态条件:由于线程执行的时序不确定,导致程序执行结果不符合预期的问题,这是线程同步需要解决的核心矛盾。

        简单来说,就是 “线程执行顺序乱了,结果就错了”。比如:

  • 消费者线程先执行,发现队列为空,却没有等待机制,只能不断轮询 “有没有数据”,浪费 CPU 资源;
  • 生产者线程生产速度太快,队列满了还在继续生产,导致数据溢出;
  • 多个线程同时修改共享的 “等待条件”(比如队列大小),导致条件判断出错。

        竞态条件的本质是 “共享资源的状态变化与线程执行顺序不匹配”,而线程同步就是通过 “等待 – 唤醒” 机制,让线程的执行顺序适配共享资源的状态变化。

1.3 同步的核心:等待与唤醒

        线程同步的本质的是解决 “等待 – 唤醒” 问题:

  • 当线程的执行条件不满足时(如队列为空、队列满),让线程进入阻塞等待状态,主动释放 CPU 资源,避免无意义轮询;
  • 当其他线程修改了共享资源,使条件满足时(如生产者生产了数据、消费者消费了数据),由该线程唤醒等待的线程,让其继续执行。

        在 Linux 下,实现 “等待 – 唤醒” 的核心工具是条件变量(pthread_cond_t),它必须与互斥量配合使用,才能安全地实现线程同步。

二、条件变量:线程同步的 “等待 – 唤醒” 神器

        条件变量是 POSIX 线程库提供的核心同步工具,专门用于实现线程间的 “等待 – 唤醒” 机制。它的核心思想是:线程在条件不满足时,释放持有的互斥量,进入阻塞等待;当其他线程修改了共享资源,使条件满足时,唤醒等待在该条件变量上的线程,让其重新竞争互斥量并继续执行。

2.1 条件变量的核心特性

        条件变量本身不具备互斥功能,它必须与互斥量配合使用,原因有二:

  • 共享资源(如队列、计数器)的访问需要互斥保护,避免多个线程同时修改;
  • 条件变量的 “等待” 操作需要与 “解锁” 操作原子化,避免唤醒信号丢失。
  •         条件变量的核心价值在于:让线程在条件不满足时主动放弃 CPU,而不是无意义地轮询,从而提高 CPU 利用率。

    2.2 条件变量的核心接口

            条件变量的核心类型是pthread_cond_t,所有操作都围绕这个类型展开,核心接口包括初始化、销毁、等待、唤醒,均在<pthread.h>头文件中声明。

    2.2.1 初始化条件变量

                    和互斥量类似,条件变量的初始化有静态分配和动态分配两种方式,适用于不同场景。

    方式 1:静态分配(推荐用于全局 / 静态变量)

            使用宏PTHREAD_COND_INITIALIZER直接初始化,简单高效,无需手动销毁:

    // 静态初始化条件变量
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    // 配合静态初始化的互斥量
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

    方式 2:动态分配(用于局部 / 堆上变量)

            使用pthread_cond_init函数初始化,支持自定义属性,使用后需要手动销毁:

    int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

    • 参数说明:
      • cond:指向要初始化的条件变量对象的指针;
      • attr:条件变量的属性,一般设为NULL(使用默认属性);
    • 返回值:成功返回 0,失败返回对应的错误号(如EINVAL表示参数无效)。

    2.2.2 销毁条件变量

            条件变量使用完成后需要销毁,释放系统资源,核心函数是pthread_cond_destroy

    int pthread_cond_destroy(pthread_cond_t *cond);

    • 参数说明:cond指向要销毁的条件变量对象;
    • 返回值:成功返回 0,失败返回错误号;
    • 注意事项:
    • 静态初始化的条件变量(PTHREAD_COND_INITIALIZER)无需手动销毁;
    • 不要销毁有线程正在等待的条件变量,否则会导致未定义行为;
    • 已经销毁的条件变量,不能再被其他线程使用。

    2.2.3 等待条件满足(核心接口)

            让线程在指定的条件变量上阻塞等待,直到被唤醒,核心函数是pthread_cond_wait

    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

    • 参数说明:
      • cond:线程要等待的条件变量;
      • mutex:线程当前持有的互斥量;
    • 返回值:成功返回 0,失败返回错误号;
    • 三个关键特性(必须牢记):
    • 自动解锁:调用函数时,线程会自动释放持有的互斥量,让其他线程有机会修改共享资源;
    • 阻塞等待:线程进入阻塞状态,不占用 CPU 资源,直到被其他线程唤醒;
    • 重新加锁:线程被唤醒后,会重新竞争互斥量,只有竞争成功后,才会从pthread_cond_wait返回,继续执行后续代码。

            简单来说,pthread_cond_wait完成了 “解锁→等待→被唤醒→重新加锁” 的原子操作,这是保证同步安全的核心。

    2.2.4 唤醒等待的线程

            POSIX 线程库提供了两个唤醒函数,分别用于唤醒一个线程和所有线程,满足不同的同步场景。

    唤醒一个线程:pthread_cond_signal

    int pthread_cond_signal(pthread_cond_t *cond);

    • 功能:唤醒等待在该条件变量上的任意一个线程(由操作系统调度选择),其他线程仍处于阻塞状态;
    • 适用场景:一对一同步(如单生产者单消费者),避免不必要的线程唤醒,提高效率。
    唤醒所有线程:pthread_cond_broadcast

    int pthread_cond_broadcast(pthread_cond_t *cond);

    • 功能:唤醒等待在该条件变量上的所有线程,让它们重新竞争互斥量;
    • 适用场景:一对多或多对多同步(如多生产者多消费者),确保所有等待的线程都能收到信号。

    2.3 条件变量快速入门:简单的等待 – 唤醒案例

            为了让大家快速理解条件变量的使用,我们写一个简单的案例:创建 2 个等待线程,主线程每隔 1 秒唤醒它们,分别测试signalbroadcast的效果。

    2.3.1 代码实现

    #include <iostream>
    #include <cstring>
    #include <unistd.h>
    #include <pthread.h>

    // 静态初始化互斥量和条件变量
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

    // 线程执行函数:等待条件变量被唤醒
    void *wait_thread(void *arg) {
    char *thread_name = static_cast<char*>(arg);
    while (true) {
    // 1. 加锁:访问共享资源前必须保证互斥
    pthread_mutex_lock(&mutex);

    // 2. 等待条件变量:自动释放互斥量,被唤醒后重新加锁
    std::cout << thread_name << ":条件不满足,进入等待…" << std::endl;
    pthread_cond_wait(&cond, &mutex);

    // 3. 被唤醒后执行:此时已重新获得互斥量
    std::cout << thread_name << ":被唤醒,执行任务…" << std::endl;

    // 4. 解锁:释放临界资源
    pthread_mutex_unlock(&mutex);

    // 模拟任务执行耗时
    sleep(1);
    }
    return nullptr;
    }

    int main(void) {
    pthread_t t1, t2;
    // 创建两个等待线程
    pthread_create(&t1, NULL, wait_thread, (void*)"线程A");
    pthread_create(&t2, NULL, wait_thread, (void*)"线程B");

    // 主线程:每隔1秒唤醒等待的线程
    sleep(2); // 确保两个线程都已启动并进入等待
    int count = 0;
    while (count < 3) {
    std::cout << "\\n主线程:准备唤醒等待的线程(第" << count+1 << "次)" << std::endl;

    // 测试1:唤醒一个线程(pthread_cond_signal)
    // pthread_cond_signal(&cond);

    // 测试2:唤醒所有线程(pthread_cond_broadcast)
    pthread_cond_broadcast(&cond);

    sleep(2);
    count++;
    }

    // 销毁资源(实际不会执行到,这里仅作演示)
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);
    return 0;
    }

    2.3.2 编译运行

    # 编译:链接pthread库(多线程编程必备)
    g++ cond_demo.cpp -o cond_demo -lpthread
    # 运行
    ./cond_demo

    2.3.3 运行结果分析

    • 使用 pthread_cond_signal 时:每次只有一个线程被唤醒(线程 A 和线程 B 交替执行),输出如下:

      线程A:条件不满足,进入等待…
      线程B:条件不满足,进入等待…

      主线程:准备唤醒等待的线程(第1次)
      线程A:被唤醒,执行任务…
      线程B:条件不满足,进入等待…

      主线程:准备唤醒等待的线程(第2次)
      线程B:被唤醒,执行任务…
      线程A:条件不满足,进入等待…

    • 使用 pthread_cond_broadcast 时:每次两个线程都被唤醒,输出如下:

      线程A:条件不满足,进入等待…
      线程B:条件不满足,进入等待…

      主线程:准备唤醒等待的线程(第1次)
      线程A:被唤醒,执行任务…
      线程B:被唤醒,执行任务…

      主线程:准备唤醒等待的线程(第2次)
      线程A:条件不满足,进入等待…
      线程B:条件不满足,进入等待…
      线程A:被唤醒,执行任务…
      线程B:被唤醒,执行任务…

            这个案例清晰地展示了条件变量的 “等待 – 唤醒” 机制,核心要点是:等待前必须加锁,唤醒后必须解锁,且pthread_cond_wait必须与互斥量配合使用。

    2.4 为什么 pthread_cond_wait 必须配合互斥量?

            这是面试和开发中的高频问题,也是理解条件变量的关键。很多人会疑惑:为什么不能先解锁,再调用pthread_cond_wait?我们从两个核心角度解答。

    2.4.1 核心原因 1:避免唤醒信号 “丢失”

            如果手动设计 “解锁→等待” 的逻辑(错误写法):

    // 错误示例:解锁和等待不是原子操作
    pthread_mutex_lock(&mutex);
    while (condition == false) {
    pthread_mutex_unlock(&mutex); // 手动解锁
    pthread_cond_wait(&cond, NULL); // 无互斥量的等待
    pthread_mutex_lock(&mutex); // 重新加锁
    }
    pthread_mutex_unlock(&mutex);

            这种写法存在致命问题:解锁和等待不是原子操作。在pthread_mutex_unlockpthread_cond_wait之间,可能有其他线程修改了共享资源,使条件满足并调用了pthread_cond_signal;但此时当前线程还未进入等待状态,这个唤醒信号会被永久错过,导致线程永远阻塞在pthread_cond_wait中。

            而pthread_cond_wait将 “解锁” 和 “等待” 封装成原子操作,从根本上避免了信号丢失的问题。

    2.4.2 核心原因 2:保证共享资源的访问安全

            条件变量的等待条件(如队列是否为空、是否满)是基于共享资源的,而共享资源的访问必须保证互斥。如果没有互斥量,多个线程可能同时检查和修改等待条件,导致竞态条件。

            比如:多个消费者线程同时检查 “队列是否为空”,如果没有互斥量,可能多个线程都判断队列为空并进入等待;当生产者生产一个数据后,调用signal唤醒一个线程,其他线程仍会永远等待,这就是典型的同步安全问题。

            总结:互斥量保证了共享资源(等待条件)的访问安全,pthread_cond_wait的原子操作避免了唤醒信号的丢失,二者缺一不可。

    2.5 条件变量的使用规范:用 while 判断条件,而非 if

            在使用条件变量时,必须用while循环判断等待条件,而不是if判断,这是避免 “伪唤醒” 的关键,也是开发中的高频坑。

    2.5.1 什么是伪唤醒?

            伪唤醒:线程在没有被其他线程调用signalbroadcast的情况下,从pthread_cond_wait中被唤醒。这是操作系统的底层实现导致的(如信号中断、线程调度优化),是条件变量的固有特性,无法避免。

            如果用if判断条件,伪唤醒会导致线程在条件不满足时继续执行,从而引发程序异常;而用while循环判断,即使发生伪唤醒,线程会重新检查条件,若条件不满足则再次进入等待,从根本上避免了伪唤醒的影响。

    2.5.2 正确的使用模板(必记)

            条件变量的使用有严格的规范,分为 “等待条件” 和 “发送信号” 两部分,这是通用的安全模板,无论什么场景都可以直接套用:

    模板 1:等待条件的线程(如消费者)

    // 1. 加锁,保证共享资源的互斥访问
    pthread_mutex_lock(&mutex);

    // 2. while循环判断条件,避免伪唤醒
    while (条件不满足) { // 如:队列为空
    pthread_cond_wait(&cond, &mutex); // 等待并自动解锁
    }

    // 3. 条件满足,操作临界资源
    // … 业务逻辑(如从队列取数据) …

    // 4. 解锁,释放临界资源
    pthread_mutex_unlock(&mutex);

    模板 2:发送信号的线程(如生产者)

    // 1. 加锁,保证共享资源的互斥访问
    pthread_mutex_lock(&mutex);

    // 2. 修改共享资源,使等待条件满足
    // … 业务逻辑(如向队列加数据) …

    // 3. 唤醒等待的线程
    pthread_cond_signal(&cond); // 或pthread_cond_broadcast

    // 4. 解锁,让被唤醒的线程重新竞争互斥量
    pthread_mutex_unlock(&mutex);

    核心要点:

  • 等待条件必须用while,绝对不能用if
  • 无论是等待还是发送信号,操作共享资源前必须加锁,操作完成后必须解锁;
  • 唤醒操作(signal/broadcast)可以在加锁后或解锁前执行,加锁后执行更安全(避免信号丢失)。
  • 2.6 条件变量的 C++ 封装:RAII 风格更优雅

            在 C 语言中,使用条件变量需要手动调用初始化、销毁、等待、唤醒函数,容易出现遗漏;而在 C++ 中,我们可以利用RAII(资源获取即初始化) 思想对条件变量进行封装,结合之前封装的互斥量,让代码更安全、更优雅,同时保证可复用性。

            我们将封装的代码写在Cond.hpp头文件中,依赖之前封装的Lock.hpp(包含Mutex和LockGuard类)。

    2.6.1 封装代码:Cond.hpp

    #pragma once
    #include <iostream>
    #include <pthread.h>
    #include <cassert>
    #include "Lock.hpp" // 引入之前封装的互斥量

    // 命名空间,避免命名冲突
    namespace CondModule {
    using namespace LockModule; // 复用互斥量的封装

    class Cond {
    public:
    // 禁止拷贝和赋值(条件变量不能被拷贝)
    Cond(const Cond&) = delete;
    Cond& operator=(const Cond&) = delete;

    // 构造函数:动态初始化条件变量
    Cond() {
    int ret = pthread_cond_init(&_cond, nullptr);
    assert(ret == 0); // 调试阶段断言,release版本自动失效
    (void)ret; // 避免编译警告
    }

    // 等待条件变量:接收自定义的Mutex对象
    void Wait(Mutex& mutex) {
    // 获取原生的pthread_mutex_t指针,与底层接口兼容
    int ret = pthread_cond_wait(&_cond, mutex.GetMutexOriginal());
    assert(ret == 0);
    (void)ret;
    }

    // 唤醒一个等待的线程
    void Notify() {
    int ret = pthread_cond_signal(&_cond);
    assert(ret == 0);
    (void)ret;
    }

    // 唤醒所有等待的线程
    void NotifyAll() {
    int ret = pthread_cond_broadcast(&_cond);
    assert(ret == 0);
    (void)ret;
    }

    // 析构函数:销毁条件变量
    ~Cond() {
    int ret = pthread_cond_destroy(&_cond);
    assert(ret == 0);
    (void)ret;
    }

    private:
    pthread_cond_t _cond; // 原生的条件变量对象
    };
    }

    2.6.2 封装代码解析

  • 禁止拷贝和赋值:条件变量是独占资源,通过= delete禁用拷贝构造和赋值运算符,避免误用;
  • 与自定义 Mutex 类配合:Wait方法接收自定义的Mutex对象,通过GetMutexOriginal获取原生互斥量指针,保证封装的一致性和兼容性;
  • RAII 风格:构造函数初始化条件变量,析构函数销毁,无需手动管理资源,避免遗漏;
  • 简洁接口:对外暴露Wait、Notify、NotifyAll三个核心接口,隐藏底层的 pthread 库实现,降低使用成本。
  • 2.6.3 封装后的使用示例

            结合Lock.hpp和Cond.hpp,重写之前的等待 – 唤醒案例,代码更简洁安全:

    #include <iostream>
    #include <unistd.h>
    #include "Lock.hpp"
    #include "Cond.hpp"

    using namespace LockModule;
    using namespace CondModule;

    // 自定义的互斥量和条件变量
    Mutex mutex;
    Cond cond;

    void *wait_thread(void *arg) {
    char *thread_name = static_cast<char*>(arg);
    while (true) {
    LockGuard lock(mutex); // RAII加锁,自动解锁
    std::cout << thread_name << ":条件不满足,进入等待…" << std::endl;
    cond.Wait(mutex); // 等待条件变量
    std::cout << thread_name << ":被唤醒,执行任务…" << std::endl;
    sleep(1);
    }
    return nullptr;
    }

    int main(void) {
    pthread_t t1, t2;
    pthread_create(&t1, NULL, wait_thread, (void*)"线程A");
    pthread_create(&t2, NULL, wait_thread, (void*)"线程B");

    sleep(2);
    int count = 0;
    while (count < 3) {
    std::cout << "\\n主线程:准备唤醒等待的线程(第" << count+1 << "次)" << std::endl;
    cond.NotifyAll(); // 唤醒所有线程
    sleep(2);
    count++;
    }

    return 0;
    }

            可以看到,封装后的代码无需手动加锁 / 解锁、初始化 / 销毁资源,极大地降低了出错概率,这也是 C++ 封装的核心优势。

    三、生产者消费者模型:线程同步的经典实战

            生产者消费者模型是线程同步的经典应用场景,几乎所有多线程协作框架(如线程池、消息队列)都基于这个模型实现。它通过一个缓冲区(如队列) 将生产者线程和消费者线程解耦,让二者各司其职、有序协作,是解决多线程协作问题的 “万能模板”。

    3.1 生产者消费者模型的 “321” 原则(必记)

            为了方便记忆和理解,生产者消费者模型可以总结为 “321” 原则,这是模型的核心精髓:

    • 3 种关系:生产者与生产者(互斥)、消费者与消费者(互斥)、生产者与消费者(互斥 + 同步);
    • 2 类角色:生产者线程(生产数据,放入缓冲区)、消费者线程(从缓冲区取数据,消费数据);
    • 1 个缓冲区:作为生产者和消费者的中间介质,解耦二者,平衡生产和消费的速度。

    3 种关系详解:

  • 生产者与生产者互斥:多个生产者不能同时向缓冲区写入数据(避免数据覆盖);
  • 消费者与消费者互斥:多个消费者不能同时从缓冲区读取数据(避免重复消费);
  • 生产者与消费者互斥 + 同步:
    • 互斥:生产者和消费者不能同时操作缓冲区(保证数据安全);
    • 同步:生产者生产数据后,需要通知消费者;消费者消费数据后,需要通知生产者。
  • 3.2 为什么需要生产者消费者模型?

            在没有生产者消费者模型的情况下,生产者和消费者直接交互,会存在以下问题:

  • 强耦合:生产者必须知道消费者的存在,消费者也必须知道生产者的存在,一方的修改会直接影响另一方;
  • 速度不匹配:生产者生产速度快,消费者消费速度慢,会导致生产者阻塞;反之,消费者会频繁轮询,浪费 CPU 资源;
  • 并发效率低:直接交互会导致临界资源的竞争过于激烈,降低程序的并发效率。
  •         而生产者消费者模型通过缓冲区解决了这些问题:

  • 解耦:生产者和消费者无需知道彼此的存在,只需要操作缓冲区,降低了代码的耦合度;
  • 平衡速度:缓冲区相当于 “蓄水池”,生产者生产的数据可以先存入缓冲区,消费者按需从缓冲区取数据,平衡了二者的速度;
  • 提高并发:生产者和消费者可以独立执行,只有操作缓冲区时才会竞争临界资源,提高了程序的并发效率。
  • 3.3 基于阻塞队列的生产者消费者模型(条件变量实现)

            阻塞队列(BlockingQueue) 是实现生产者消费者模型的最佳载体,它是一种特殊的队列,具有以下特性:

    • 当队列为空时,消费者从队列取数据的操作会阻塞,直到队列中有数据;
    • 当队列为满时,生产者向队列存数据的操作会阻塞,直到队列中有空闲位置。

            阻塞队列的阻塞特性正好契合了生产者消费者的同步需求,我们使用 “条件变量 + 互斥量” 实现阻塞队列,并基于此实现多生产者多消费者模型。

    3.3.1 实现思路

  • 缓冲区:使用 C++ 的std::queue作为底层缓冲区,指定队列的最大容量(上限);
  • 互斥量:保护队列的操作(入队 / 出队 / 判空 / 判满),保证同一时刻只有一个线程操作队列;
  • 条件变量:使用两个条件变量,分别实现生产者等待和消费者等待:
    • _prod_cond:生产者条件变量,队列为满时,生产者阻塞等待;
    • _cons_cond:消费者条件变量,队列为空时,消费者阻塞等待;
  • 等待计数:记录等待的生产者和消费者数量,避免无意义的唤醒,提高效率;
  • 核心接口:
    • Enqueue:生产者接口,向队列中添加数据,队满时阻塞;
    • Dequeue:消费者接口,从队列中取出数据,队空时阻塞。
  • 3.3.2 阻塞队列实现:BlockQueue.hpp(模板类)

    #ifndef __BLOCK_QUEUE_HPP__
    #define __BLOCK_QUEUE_HPP__

    #include <iostream>
    #include <queue>
    #include <pthread.h>
    #include <cassert>
    #include <unistd.h>

    // 模板类,支持任意类型的元素(如int、自定义任务对象)
    template <typename T>
    class BlockQueue {
    private:
    // 判满:队列大小等于最大容量
    bool IsFull() const {
    return _queue.size() == _capacity;
    }

    // 判空:队列为空
    bool IsEmpty() const {
    return _queue.empty();
    }

    public:
    // 构造函数:初始化队列容量和同步资源
    BlockQueue(int capacity)
    : _capacity(capacity), _prod_wait_cnt(0), _cons_wait_cnt(0) {
    // 初始化互斥量和条件变量
    int ret = pthread_mutex_init(&_mutex, nullptr);
    assert(ret == 0);
    ret = pthread_cond_init(&_prod_cond, nullptr);
    assert(ret == 0);
    ret = pthread_cond_init(&_cons_cond, nullptr);
    assert(ret == 0);
    (void)ret; // 避免编译警告
    }

    // 析构函数:销毁同步资源
    ~BlockQueue() {
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_prod_cond);
    pthread_cond_destroy(&_cons_cond);
    }

    // 生产者接口:入队,队满时阻塞
    void Enqueue(const T& data) {
    // 1. 加锁,保证队列操作的互斥性
    pthread_mutex_lock(&_mutex);

    // 2. while循环判满,避免伪唤醒
    while (IsFull()) {
    _prod_wait_cnt++;
    std::cout << "队列满(容量:" << _capacity << "),生产者[" << pthread_self() << "]进入等待…" << std::endl;
    pthread_cond_wait(&_prod_cond, &_mutex); // 等待并自动解锁
    _prod_wait_cnt–;
    }

    // 3. 队未满,生产数据(入队)
    _queue.push(data);
    std::cout << "生产者[" << pthread_self() << "]生产数据:" << data
    << ",队列当前大小:" << _queue.size() << std::endl;

    // 4. 唤醒等待的消费者(如果有)
    if (_cons_wait_cnt > 0) {
    std::cout << "唤醒一个等待的消费者" << std::endl;
    pthread_cond_signal(&_cons_cond);
    }

    // 5. 解锁
    pthread_mutex_unlock(&_mutex);
    }

    // 消费者接口:出队,队空时阻塞
    void Dequeue(T& data) {
    // 1. 加锁,保证队列操作的互斥性
    pthread_mutex_lock(&_mutex);

    // 2. while循环判空,避免伪唤醒
    while (IsEmpty()) {
    _cons_wait_cnt++;
    std::cout << "队列为空,消费者[" << pthread_self() << "]进入等待…" << std::endl;
    pthread_cond_wait(&_cons_cond, &_mutex); // 等待并自动解锁
    _cons_wait_cnt–;
    }

    // 3. 队非空,消费数据(出队)
    data = _queue.front();
    _queue.pop();
    std::cout << "消费者[" << pthread_self() << "]消费数据:" << data
    << ",队列当前大小:" << _queue.size() << std::endl;

    // 4. 唤醒等待的生产者(如果有)
    if (_prod_wait_cnt > 0) {
    std::cout << "唤醒一个等待的生产者" << std::endl;
    pthread_cond_signal(&_prod_cond);
    }

    // 5. 解锁
    pthread_mutex_unlock(&_mutex);
    }

    private:
    std::queue<T> _queue; // 底层缓冲区:队列
    int _capacity; // 队列最大容量
    pthread_mutex_t _mutex; // 保护队列的互斥量
    pthread_cond_t _prod_cond; // 生产者条件变量
    pthread_cond_t _cons_cond; // 消费者条件变量
    int _prod_wait_cnt; // 等待的生产者数量
    int _cons_wait_cnt; // 等待的消费者数量
    };

    #endif // __BLOCK_QUEUE_HPP__

    3.3.3 生产者消费者模型测试代码:main.cpp

            我们创建 2 个生产者线程和 3 个消费者线程,测试阻塞队列的功能,生产者每隔 1 秒生产一个整数,消费者每隔 2 秒消费一个整数:

    #include <iostream>
    #include <pthread.h>
    #include <unistd.h>
    #include "BlockQueue.hpp"

    // 阻塞队列:容量为5,存储int类型数据
    BlockQueue<int> g_block_queue(5);

    // 生产者线程执行函数
    void *producer_thread(void *arg) {
    int producer_id = *(static_cast<int*>(arg));
    delete static_cast<int*>(arg); // 释放动态分配的参数

    int data = 0;
    while (true) {
    // 生产数据:简单递增的整数
    data++;
    g_block_queue.Enqueue(data);

    // 模拟生产耗时
    sleep(1);
    }
    return nullptr;
    }

    // 消费者线程执行函数
    void *consumer_thread(void *arg) {
    int consumer_id = *(static_cast<int*>(arg));
    delete static_cast<int*>(arg); // 释放动态分配的参数

    while (true) {
    int data = 0;
    g_block_queue.Dequeue(data);

    // 模拟消费耗时
    sleep(2);
    }
    return nullptr;
    }

    int main(void) {
    std::cout << "生产者消费者模型启动(2生产者,3消费者,队列容量5)" << std::endl;

    // 创建2个生产者线程
    pthread_t prod_tids[2];
    for (int i = 0; i < 2; ++i) {
    int *id = new int(i+1);
    pthread_create(&prod_tids[i], NULL, producer_thread, static_cast<void*>(id));
    }

    // 创建3个消费者线程
    pthread_t cons_tids[3];
    for (int i = 0; i < 3; ++i) {
    int *id = new int(i+1);
    pthread_create(&cons_tids[i], NULL, consumer_thread, static_cast<void*>(id));
    }

    // 等待线程执行(实际不会退出,这里仅作演示)
    for (int i = 0; i < 2; ++i) {
    pthread_join(prod_tids[i], NULL);
    }
    for (int i = 0; i < 3; ++i) {
    pthread_join(cons_tids[i], NULL);
    }

    return 0;
    }

    3.3.4 编译运行与结果分析

    # 编译:链接pthread库
    g++ main.cpp -o prod_cons -lpthread
    # 运行
    ./prod_cons

    运行结果(关键部分):

    生产者消费者模型启动(2生产者,3消费者,队列容量5)
    生产者[140709348478720]生产数据:1,队列当前大小:1
    唤醒一个等待的消费者
    消费者[140709331693312]消费数据:1,队列当前大小:0
    唤醒一个等待的生产者
    生产者[140709340086016]生产数据:1,队列当前大小:1
    唤醒一个等待的消费者
    生产者[140709348478720]生产数据:2,队列当前大小:2
    生产者[140709340086016]生产数据:2,队列当前大小:3
    生产者[140709348478720]生产数据:3,队列当前大小:4
    生产者[140709340086016]生产数据:3,队列当前大小:5
    生产者[140709348478720]生产数据:4,队列当前大小:5
    队列满(容量:5),生产者[140709348478720]进入等待…
    消费者[140709323300608]消费数据:2,队列当前大小:4
    唤醒一个等待的生产者
    生产者[140709348478720]生产数据:4,队列当前大小:5
    队列满(容量:5),生产者[140709348478720]进入等待…

    结果分析:

  • 队列容量为 5,当生产者生产速度大于消费者消费速度时,队列满后生产者进入等待;
  • 消费者消费数据后,会唤醒等待的生产者,让其继续生产;
  • 队列为空时,消费者进入等待,生产者生产数据后会唤醒消费者;
  • 所有线程有序协作,没有出现数据覆盖、重复消费或队列溢出的问题,充分体现了线程同步的核心价值。
  • 3.4 模型扩展:支持自定义任务类型

            阻塞队列的模板设计支持任意类型的数据,除了整数,我们还可以定义自定义任务类型(如计算任务、IO 任务),让生产者消费者模型更贴近实际开发场景。

    3.4.1 自定义任务类型:Task.hpp

    #pragma once
    #include <iostream>
    #include <functional>

    // 任务类型:支持任意无参无返回值的函数
    using Task = std::function<void()>;

    // 示例任务:加法计算
    void AddTask(int a, int b) {
    int result = a + b;
    std::cout << "任务执行:" << a << " + " << b << " = " << result << std::endl;
    }

    // 示例任务:字符串打印
    void PrintTask(const std::string& msg) {
    std::cout << "任务执行:" << msg << std::endl;
    }

    3.4.2 测试代码:main_task.cpp

    #include <iostream>
    #include <pthread.h>
    #include <unistd.h>
    #include "BlockQueue.hpp"
    #include "Task.hpp"

    // 阻塞队列:存储Task类型
    BlockQueue<Task> g_task_queue(3);

    // 生产者线程:生产任务
    void *task_producer(void *arg) {
    int producer_id = *(static_cast<int*>(arg));
    delete static_cast<int*>(arg);

    int count = 0;
    while (true) {
    // 生产不同的任务
    if (count % 2 == 0) {
    // 加法任务
    Task task = std::bind(AddTask, count, count+1);
    g_task_queue.Enqueue(task);
    std::cout << "生产者[" << producer_id << "]生产加法任务:" << count << " + " << count+1 << std::endl;
    } else {
    // 打印任务
    std::string msg = "Hello, Producer " + std::to_string(producer_id) + ", Task " + std::to_string(count);
    Task task = std::bind(PrintTask, msg);
    g_task_queue.Enqueue(task);
    std::cout << "生产者[" << producer_id << "]生产打印任务:" << msg << std::endl;
    }

    count++;
    sleep(1);
    }
    return nullptr;
    }

    // 消费者线程:消费任务
    void *task_consumer(void *arg) {
    int consumer_id = *(static_cast<int*>(arg));
    delete static_cast<int*>(arg);

    while (true) {
    Task task;
    g_task_queue.Dequeue(task);

    // 执行任务
    std::cout << "消费者[" << consumer_id << "]开始执行任务" << std::endl;
    task();

    sleep(2);
    }
    return nullptr;
    }

    int main(void) {
    std::cout << "任务型生产者消费者模型启动(2生产者,2消费者,队列容量3)" << std::endl;

    // 创建2个生产者线程
    pthread_t prod_tids[2];
    for (int i = 0; i < 2; ++i) {
    int *id = new int(i+1);
    pthread_create(&prod_tids[i], NULL, task_producer, static_cast<void*>(id));
    }

    // 创建2个消费者线程
    pthread_t cons_tids[2];
    for (int i = 0; i < 2; ++i) {
    int *id = new int(i+1);
    pthread_create(&cons_tids[i], NULL, task_consumer, static_cast<void*>(id));
    }

    // 等待线程执行
    for (int i = 0; i < 2; ++i) {
    pthread_join(prod_tids[i], NULL);
    pthread_join(cons_tids[i], NULL);
    }

    return 0;
    }

    3.4.3 运行结果

    任务型生产者消费者模型启动(2生产者,2消费者,队列容量3)
    生产者[1]生产加法任务:0 + 1
    唤醒一个等待的消费者
    消费者[1]开始执行任务
    任务执行:0 + 1 = 1
    生产者[2]生产打印任务:Hello, Producer 2, Task 0
    生产者[1]生产打印任务:Hello, Producer 1, Task 1
    生产者[2]生产加法任务:1 + 2
    生产者[1]生产加法任务:2 + 3
    队列满(容量:3),生产者[1]进入等待…
    消费者[2]开始执行任务
    任务执行:Hello, Producer 2, Task 0
    唤醒一个等待的生产者
    生产者[1]生产打印任务:Hello, Producer 1, Task 3
    队列满(容量:3),生产者[1]进入等待…

            这个扩展案例展示了生产者消费者模型的灵活性,通过自定义任务类型,可以将其应用于各种实际场景(如 Web 服务器的请求处理、后台任务调度等)。

    四、常见问题与避坑指南

            在使用条件变量和生产者消费者模型时,容易遇到一些问题,这里总结了高频坑和避坑技巧,帮助你写出更健壮的代码。

    4.1 坑 1:用 if 判断条件,而非 while

            问题:使用if判断等待条件,遇到伪唤醒时,线程会在条件不满足的情况下继续执行,导致数据错误;

            解决:必须用while循环判断条件,即使被伪唤醒,也会重新检查条件,不满足则继续等待。

    4.2 坑 2:忘记解锁或解锁时机错误

            问题:加锁后忘记解锁,或在条件判断前解锁,导致死锁或数据竞争;

            解决:

  • 严格遵循 “加锁→操作→解锁” 的顺序;
  • 在 C++ 中使用 RAII 风格的锁(如LockGuard),自动管理锁的生命周期;
  • 条件变量的wait函数会自动解锁,唤醒后重新加锁,无需手动干预。
  • 4.3 坑 3:唤醒函数使用不当

            问题:在多生产者多消费者场景中,使用signal唤醒一个线程,导致部分线程永远等待;解决:

  • 多生产者多消费者场景使用pthread_cond_broadcast,唤醒所有等待的线程;
  • 单生产者单消费者场景使用pthread_cond_signal,提高效率。
  • 4.4 坑 4:队列容量设计不合理

            问题:队列容量过大导致内存浪费,过小导致生产者频繁等待;

            解决:根据实际场景调整队列容量,一般设置为 CPU 核心数的 2~4 倍,或根据生产消费速度动态调整。

    4.5 坑 5:线程退出机制缺失

            问题:生产者消费者线程进入无限循环,无法正常退出;

            解决:

  • 设计退出标志(如全局变量is_running),线程定期检查,收到退出信号后退出;
  • 使用信号量或条件变量通知所有线程退出,避免线程泄漏。

  • 总结

            本文作为线程同步的上篇,从核心概念出发,深入讲解了条件变量的原理与用法,再通过生产者消费者模型完成了实战落地。在下一篇文章中,我们将继续深入线程同步。

            如果本文对你有帮助,欢迎点赞、收藏、关注!如果在学习过程中遇到了问题,或者有更好的实战案例,欢迎在评论区留言交流!

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » 【Linux系统编程】(四十三)线程同步上篇:从条件变量到生产消费模型,吃透多线程协作精髓
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!