欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 培训 > 【线程】基于阻塞队列的生产者消费者模型

【线程】基于阻塞队列的生产者消费者模型

2025/5/22 21:56:28 来源:https://blog.csdn.net/qq_64076540/article/details/145421465  浏览:    关键词:【线程】基于阻塞队列的生产者消费者模型

文章目录

  • 1 生产者消费者模型
  • 2 阻塞队列
    • 2.1 成员变量
    • 2.2 消费者操作
    • 2.3 生产者生产
  • 3 总结

1 生产者消费者模型

在多线程环境中,生产者消费者模型是一种经典的线程同步模型,用于处理生产者线程与消费者线程之间的工作调度和资源共享问题。在这个模型中,生产者和消费者共享一个缓冲区,生产者往缓冲区中放入商品(或者数据),而消费者则从缓冲区中取出商品(或者数据)。为了确保线程安全,避免资源竞争,通常需要使用同步机制如互斥锁(mutex) 和 条件变量(condition variable)。

2 阻塞队列

阻塞队列在生产者消费者模型中是非常常见的一种设计,通过互斥锁条件变量来确保线程同步,避免数据竞争。生产者和消费者分别在合适的时机阻塞和唤醒彼此,使得生产者和消费者能平稳地进行数据的生产和消费。

2.1 成员变量

class BlockQueue
{static const int defaultnum = 20;
public:BlockQueue(int maxcap = defaultnum):_maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}
private:std::queue<T> _q;  // 队列,存储生产者生产的数据int _maxcap;  // 队列的最大容量pthread_mutex_t _mutex;  // 互斥锁,用于保护队列的访问pthread_cond_t _c_cond;  // 消费者条件变量,用于阻塞消费	者pthread_cond_t _p_cond;  // 生产者条件变量,用于阻塞生产者};
  • _q 是用于存储数据的队列。
  • _maxcap 是队列的最大容量。
  • _mutex 是互斥锁,用来保证生产者和消费者在访问队列时的互斥性。
  • _c_cond 是消费者的条件变量,当队列为空时,消费者会被阻塞,直到队列有数据。
  • _p_cond 是生产者的条件变量,当队列满时,生产者会被阻塞,直到队列有空间。

2.2 消费者操作

T pop(){//1.上锁  --> 消费的时候,需要给消费者上锁pthread_mutex_lock(&_mutex);while(_q.size() == 0){//当商品为空的时候,就阻塞消费者pthread_cond_wait(&_c_cond, &_mutex);}//3.走到这里,两种情况 : 1.队列满了  2.被唤醒T out = _q.front();_q.pop();//4.当pop之后,队列就一定没满,因此可以唤醒生产者去生产了pthread_cond_signal(&_p_cond);pthread_mutex_unlock(&_mutex);return out;}
pthread_mutex_lock(&_mutex);

1. 先上锁,保证数据的安全

 while(_q.size() == 0){//当商品为空的时候,就阻塞消费者pthread_cond_wait(&_c_cond, &_mutex);}T out = _q.front();_q.pop();

2. 上锁之后,可以进行消费,有两种情况:
case 1 : 队列为空,没有数据,则阻塞消费者
case 2: 队列不为空,进行消费

注意:这里的pthread_cond_wait(&_c_cond, &_mutex);在阻塞消费者的同时会释放mutex互斥锁,避免死锁的产生

 //当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
pthread_cond_signal(&_p_cond);

3. 消费之后,队列一定不满(至少都有一个空位,因为刚刚消费了)。所以可以唤醒生产者进行生产

 pthread_mutex_unlock(&_mutex);

4. 所有操作结束之后,释放锁,避免死锁

2.3 生产者生产

void push(const T& in){//1.上锁  --> 生产的时候,需要给生产者上锁pthread_mutex_lock(&_mutex);//2.当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态//伪唤醒的情况while(_q.size() == _maxcap){//自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态pthread_cond_wait(&_p_cond, &_mutex);}//3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒_q.push(in);pthread_cond_signal(&_c_cond);pthread_mutex_unlock(&_mutex);}
pthread_mutex_lock(&_mutex);

1. 先上锁,保证数据的安全

 //当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态//伪唤醒的情况while(_q.size() == _maxcap){//自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态pthread_cond_wait(&_p_cond, &_mutex);}//3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒_q.push(in);

2. 上锁之后,可以进行生产,有两种情况:
case 1 : 队列为满,不能继续生产,则阻塞生产者
case 2: 队列不为满,继续生产

为什么要使用while?

防止伪唤醒。

注意:这里的pthread_cond_wait(&_p_cond, &_mutex);在阻塞生产者的同时会释放mutex互斥锁,避免死锁的产生

pthread_cond_signal(&_c_cond);

3. 生产之后,队列一定不为空(至少有一个商品,因此可以继续消费)所以可以唤醒消费者进行消费

 pthread_mutex_unlock(&_mutex);

4. 所有操作结束之后,释放锁,避免死锁

3 总结

main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>void* Consumer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);while(true){Task t = bq->pop();std::cout << "消费了一个任务 : " << t.GetTask() << " 运算结果是 : " << t.GetResult() << "thread id : " << pthread_self() << std::endl;sleep(1);}}void* Productor(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);int x = 10, y = 20;while(true){int data1 = rand() % 10 + 1;  //控制data1为[1,10]之间usleep(10);int data2 = rand() % 10 + 1;  //控制data2为[1,10]之间char op = opers[rand() % opers.size()];  //随机选取一个运算符//构建任务Task t(data1, data2, op);bq->push(t);std::cout << "生产了一个任务 : "  << t.GetTask() << "thread id : " << pthread_self() << std::endl;sleep(1);}
}int main()
{srand(time(nullptr));BlockQueue<Task> *bq = new BlockQueue<Task>();pthread_t c[3], p[5];for (int i = 0; i < 3; ++ i){pthread_create(c + i, nullptr, Consumer, bq);}for (int i = 0; i < 5; ++ i){pthread_create(p + i, nullptr, Productor, bq);}for (int i = 0; i < 3; ++ i){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; ++ i){pthread_join(p[i], nullptr);}delete bq;return 0;
}

BlockQueue.hpp

#include <iostream>
#include <queue>
#include <pthread.h>template<class T>
class BlockQueue
{static const int defaultnum = 20;
public:BlockQueue(int maxcap = defaultnum):_maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}T pop(){//1.上锁  --> 消费的时候,需要给消费者上锁pthread_mutex_lock(&_mutex);while(_q.size() == 0){//当商品为空的时候,就阻塞消费者pthread_cond_wait(&_c_cond, &_mutex);}//3.走到这里,两种情况 : 1.队列满了  2.被唤醒T out = _q.front();_q.pop();//4.当pop之后,队列就一定没满,因此可以唤醒生产者去生产了pthread_cond_signal(&_p_cond);pthread_mutex_unlock(&_mutex);return out;}void push(const T& in){//1.上锁  --> 生产的时候,需要给生产者上锁pthread_mutex_lock(&_mutex);//2.当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态//伪唤醒的情况while(_q.size() == _maxcap){//自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态pthread_cond_wait(&_p_cond, &_mutex);}//3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒_q.push(in);pthread_cond_signal(&_c_cond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}
private:std::queue<T> _q;  //共享资源,只有一个,但是可以被当成很多个int _maxcap;  //最大值pthread_mutex_t _mutex;   //锁pthread_cond_t _c_cond;    //consumer cond 消费者条件变量pthread_cond_t _p_cond;    //productor cond 生产者条件变量
};

Task.hpp

#include <iostream>
#include <string>std::string opers = "+-*%";enum
{DivZero = 1,ModZero,Unkown
};class Task
{
public:Task(int x1, int x2, char oper):_data1(x1),_data2(x2),_oper(oper),_result(0),_exitcode(0){}void run(){switch(_oper){case '+':_result = _data1 + _data2;break;case '-':_result = _data1 - _data2;break;case '*':_result = _data1 * _data2;break;case '/':if (_data2 == 0)_exitcode = DivZero;else_result = _data1 / _data2;break;case '%':if (_data2 == 0)_exitcode = ModZero;else_result = _data1 % _data2;break;default:_exitcode = Unkown;break;}}//重载operator()void operator()(){run();}std::string GetTask(){std::string r = std::to_string(_data1);r += _oper;r += std::to_string(_data2);r += "= ?";return r;}std::string GetResult(){std::string r = std::to_string(_data1);r += _oper;r += std::to_string(_data2);r += "= ";r += std::to_string(_result);r += "[code: ";r += std::to_string(_exitcode);r += "]";return r;}~Task(){}private:int _data1;int _data2;char _oper;int _result;int _exitcode;
};

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词