欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > 利用生产者-消费者理论实现阻塞队列的读写

利用生产者-消费者理论实现阻塞队列的读写

2025/9/26 15:50:14 来源:https://blog.csdn.net/2301_80377335/article/details/146615156  浏览:    关键词:利用生产者-消费者理论实现阻塞队列的读写

目录

阻塞队列模型解析

成员属性

成员变量初始化

析构函数编写

其他成员函数编写

equeue/isfull

pop/isempty

pop/equeue的必要优化

main.cc测试程序编写

一个潜在的隐患(判断是否为满/空的优化)

切换多生产者-多消费者

条件变量的封装

构造函数/析构函数

wait

notify/notifyall

完整代码

使用阻塞队列传递任务

使用回调函数实现任务处理

总结


我们今天来利用生产者-消费者理论实现阻塞队列的读写。

阻塞队列模型解析

根据生产者-消费者模型理论,在我们要实现的阻塞队列中,一方线程担任消费者进行向阻塞队列读数取数据,一方担任生产者向阻塞队列写数据,阻塞队列自然是相当于存储数据的缓存的结构,我们用队列实现比较合理简单。

成员属性

我们在一个命名空间里将阻塞队列结构封装在blockqueue类里面,blockqueue类就有私有成员queue q,由于队列数据不确定所以得加入模板参数,阻塞队列要阻塞就得规定大小,所以还要一个int类型的变量表示最大容量,生产者和消费者是互斥关系,所以还需要一把锁,生产者之间和消费者之间为了分配资源合理所以各需要一个等待队列(条件变量),为什么不需要两把锁呢?,作为临界资源的阻塞队列就一个,一个临界资源配一把锁很合理,一个线程抢夺了锁其他线程进不来了满足多线程的互斥。成员变量不够后面再填!!!

private:queue<T> q; //保存数据的容器(队列), 临界资源int _cap;    //队列最大容量pthread_mutex_t _mutex;   //作为临界资源的锁pthread_cond_t _productor_cond;  //生产者条件变量pthread_cond_t _consumer_cond;   //消费者条件变量};

成员变量初始化

队列q不需要显示调用,用自己的默认构造就可以了,所以不用初始化,容量需要我们传入一下,然后给个缺省值,没传入就用缺省值,这样比较灵活,锁和条件变量是动态申请的就调用自己的init函数。

static const int gcap = 10;template<class T>class blockqueue{public:blockqueue(int cap = gcap) //给一个缺省值:_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_productor_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}

析构函数编写

队列q不需要显示调用,用自己的默认析构就可以了,所以不用析构,int类型的容量不需要析构,锁和条件变量是动态申请的就调用自己的destroy函数。

~blockqueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}

其他成员函数编写

equeue/isfull

equeue函数作为入队函数就是写入队列的线程调用的,传入一个模板类型的值,然后push一下就可以了,咦不对,需要加锁,毕竟读和写不是并行的,要控制队列未满才可以写,所以一进equeue就加锁,判断没有满才写入,满了加入等待队列等待唤醒,退出函数之前解锁,就这么简单。这里只要解锁一次。

        bool isfull(){return q.size() == _cap;}void equeue(const T& in) //入队函数{//加锁pthread_mutex_lock(&_mutex);if (isfull()){//加入生产者的等待队列pthread_cond_wait(&_productor_cond, &_mutex);}q.push(in);//解锁pthread_mutex_unlock(&_mutex);}

pthread_cond_wait不失败的前提下,这样执行push的只能是队列未满和生产者线程被唤醒。

pop/isempty

pop函数作为出队函数就是读取队列的线程调用的,传入一个模板类型的指针,将队头元素给到这个指针指向的空间,然后再.pop(移除)一下就可以了,咦不对,需要加锁,毕竟读和写不是并行的,要控制队列不为空才可以读取,所以一进pop函数就加锁,判断不为空才读取,为空加入等待队列等待唤醒,退出函数之前解锁,就这么简单。这里只要解锁一次。

pthread_cond_wait不失败的前提下,这样执行pop的只能是队列为空和消费者线程被唤醒。

        bool isempty(){return q.empty();}void pop(T* out) //出队函数{//加锁pthread_mutex_lock(&_mutex);if (isempty()){//加入消费者的等待队列pthread_cond_wait(&_consumer_cond, &_mutex);}*out = q.front();q.pop();//解锁pthread_mutex_unlock(&_mutex);}

pop/equeue的必要优化

有个问题,既然生产者线程和消费者线程都有处在休眠等待的,如果没人唤醒不就谁也不理谁了吗,跟之前一样,当生产者写入时,队列一定不为空,这时可以直接唤醒消费者线程进行读取,反之,当消费者读取时,队列可能为空,但一定不是满的,这时可以直接唤醒生产者线程进行写入。

直接唤醒操作还得判断当前对应队列是否有线程,不然就白写了,所以加入两个int私有成员,分别记录处于生产者等待队列和消费者等待队列中线程的个数。有线程进入等待队列就++,唤醒就--。

private:queue<T> q; //保存数据的容器(队列), 临界资源int _cap;    //队列最大容量pthread_mutex_t _mutex;   //作为临界资源的锁pthread_cond_t _productor_cond;  //生产者条件变量pthread_cond_t _consumer_cond;   //消费者条件变量int _pwait_num;          //生产者等待队列内线程计数int _cwait_num;          //消费者等待队列内线程计数};
blockqueue(int cap = gcap) //给一个缺省值:_cap(cap),_pwait_num(0),_cwait_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_productor_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}

都先初始化为0。

        bool isfull(){return q.size() == _cap;}void equeue(const T& in) //入队函数{//加锁pthread_mutex_lock(&_mutex);if (isfull()){_pwait_num++;//加入生产者的等待队列pthread_cond_wait(&_productor_cond, &_mutex);_pwait_num--;}q.push(in); //写入//解锁pthread_mutex_unlock(&_mutex);if (_cwait_num){pthread_cond_signal(&_consumer_cond);}}bool isempty(){return q.empty();}void pop(T* out) //出队函数{//加锁pthread_mutex_lock(&_mutex);if (isempty()){_cwait_num++;//加入消费者的等待队列pthread_cond_wait(&_consumer_cond, &_mutex);_cwait_num--;}*out = q.front();q.pop();//解锁pthread_mutex_unlock(&_mutex);if (_pwait_num){pthread_cond_signal(&_productor_cond);}}

这里建议先解锁再唤醒对立的线程,这么做可以减少唤醒线程的锁竞争,提高性能,如果先唤醒线程的话,wait也会申请锁,如果接着的解锁速度慢于wait申请锁的速度,就变成锁的重复申请会死锁的。我试过了,唤醒先进行也不一定会死锁,运行得好好的,但是感觉先解锁更好。

main.cc测试程序编写

我们先采用单生产-单消费的形式,切换成多生产-多消费的形式其实就是多加几个线程而已,.hpp根本不需要改,因为每次进入临界资源的只能是一个线程。

我们创建了两个线程分别死循环的执行对应的方法,然后我们让消费者线程消费得慢一点,2s消费一次,生产者线程的速度没有减慢,我们可以看到肯定是消费者读取一个,生产者接着生产一个,生产者在队列满了之后等待消费者消费,整个队列始终处于相对满的状态。

生产者除了写入数据,写入的数据从哪来,需要自己造或者从另一个地方搬去,消费者也不是只读取数据,消费者读取完数据需要处理的。

#include"blockqueue.hpp"
using namespace blockqueuemodule;//消费---读取
void* consumer(void* args)
{blockqueue<int>* bq = static_cast<blockqueue<int>*>(args);while (true){sleep(2);int data;bq->pop(&data);// 做处理printf("consumer, 消费了一个数据: %d\n", data);}}//生产---写入
void* productor(void* args)
{blockqueue<int>* bq = static_cast<blockqueue<int>*>(args);int data = 10;while (true){//写到bq中bq->equeue(data);printf("producter, 生产了一个数据: %d\n", data);data++;}
}int main()
{blockqueue<int>* bq = new blockqueue<int>(6);//单生产,单消费pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

接着加入更多的提示美化等待,唤醒的过程!!!这个属于非必要的优化。

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
using namespace std;namespace blockqueuemodule
{static const int gcap = 10;template<class T>class blockqueue{public:blockqueue(int cap = gcap) //给一个缺省值:_cap(cap),_pwait_num(0),_cwait_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_productor_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}bool isfull(){return q.size() == _cap;}void equeue(const T& in) //入队函数{//加锁pthread_mutex_lock(&_mutex);if (isfull()){cout << "生产者进入等待。。。" << endl;_pwait_num++;//加入生产者的等待队列pthread_cond_wait(&_productor_cond, &_mutex);_pwait_num--;cout << "生产者被唤醒。。。" << endl;}q.push(in); //写入//解锁pthread_mutex_unlock(&_mutex);if (_cwait_num){cout << "叫醒消费者" << endl;pthread_cond_signal(&_consumer_cond);}}bool isempty(){return q.empty();}void pop(T* out) //出队函数{//加锁pthread_mutex_lock(&_mutex);if (isempty()){cout << "消费者进入等待。。。" << endl;_cwait_num++;//加入消费者的等待队列pthread_cond_wait(&_consumer_cond, &_mutex);_cwait_num--;cout << "消费者被唤醒。。。" << endl;}*out = q.front();q.pop();//解锁pthread_mutex_unlock(&_mutex);if (_pwait_num){cout << "叫醒生产者" << endl;pthread_cond_signal(&_productor_cond);}}~blockqueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}private:queue<T> q; //保存数据的容器(队列), 临界资源int _cap;    //队列最大容量pthread_mutex_t _mutex;   //作为临界资源的锁pthread_cond_t _productor_cond;  //生产者条件变量pthread_cond_t _consumer_cond;   //消费者条件变量int _pwait_num;          //生产者等待队列内线程计数int _cwait_num;          //消费者等待队列内线程计数};};

运行发现没有什么问题!!!

一个潜在的隐患(判断是否为满/空的优化)

由于我们上面这个是单生产者-单消费者的模式,如果改成多生产者-多消费者,当生产者生产速度远远大于消费者消费的速度那就会有少数的生产者线程在等待队列却受到了大量的生产者发来的解除等待的信号,那大量的signal进行唤醒肯定会容易出错,再加上某些系统(如 Linux)可能为了性能,允许线程在未收到信号时提前唤醒,多个线程竞争锁时,由于不符合加锁条件,而唤醒对立线程,大量的signal某些线程可能被错误唤醒,这种情况称为伪唤醒,pthread_cond_wait允许伪唤醒。如果伪唤醒时不满足进入临界区的条件按我们的代码根本拦不住这个线程,所以每次消费者/生产者从等待队列被唤醒时是不是要再次检查我们之前设置的不能进入临界区的条件才更安全。

所以不能进入临界区的条件由if改成while循环是我们的常规操作,保证最后要退出循环时再检查一次。实际上我们一般会让生产/消费的时间更加均衡。

切换多生产者-多消费者

上面那个程序是单生产者-单消费者的形式,我们上面说了只需要增加线程就可以实现多生产者-多消费者的模式。

int main()
{blockqueue<int>* bq = new blockqueue<int>(6);//单生产,单消费pthread_t c1, c2, p1, p2, p3;pthread_create(&c1, nullptr, consumer, bq);pthread_create(&p2, nullptr, consumer, bq);pthread_create(&p1, nullptr, productor, bq);pthread_create(&p2, nullptr, productor, bq);pthread_create(&p3, nullptr, productor, bq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);return 0;
}

我们这个代码基本上写完了,但是还没完,我们还可以优化,但是需要先讲一下条件变量的封装。

条件变量的封装

条件变量的封装主要是用我们的类模拟条件变量的常见函数调用。唯一私有成员自然就是我们的条件变量下面封装函数:

构造函数/析构函数

由于我们是要动态构造一个cond,所以构造的时候直接调用pthread_cond_init,析构时直接调用pthread_cond_destroy

class cond{public:cond(){int n = pthread_cond_init(&_cond, nullptr);(void)n;}~cond(){int n = pthread_cond_destroy(&_cond);(void)n;}private:pthread_cond_t _cond;};

wait

这个函数就是封装的pthread_cond_wait。条件变量已经有了,只需要传入一个锁,我们没有锁怎么办,我们之前封装过一个锁,直接把那个代码拿过来,传入我们自己封装的锁对象。

由于原本封装锁的函数的锁是私有的,所以需要写一个getlock将锁的地址带出来,方便我们直接传入。

void wait(mutex& mut){int n = pthread_cond_wait(&_cond, mut.getlock());(void)n;}

notify/notifyall

notify这个函数就是封装的pthread_cond_signal,用于唤醒正在等待的线程,notifyall封装的是pthread_cond_broadcast,也是唤醒线程,这个接口可以唤醒全部在等待的线程。所以线程的唤醒我们用了两个接口。

void notify(){int n = pthread_cond_signal(&_cond);(void)n;}void notifyall(){int n = pthread_cond_broadcast(&_cond);(void)n;}

完整代码

#pragma once
#include<iostream>
#include<pthread.h>
#include"mutex.hpp"
using namespace lockmodule;
using namespace std;
namespace condmodule
{class cond{public:cond(){int n = pthread_cond_init(&_cond, nullptr);(void)n;}void wait(mutex& mut){int n = pthread_cond_wait(&_cond, mut.getlock());(void)n;}void notify(){int n = pthread_cond_signal(&_cond);(void)n;}void notifyall(){int n = pthread_cond_broadcast(&_cond);(void)n;}~cond(){int n = pthread_cond_destroy(&_cond);(void)n;}private:pthread_cond_t _cond;};};

我这突然写一个条件变量的封装是要干什么,也不测试,当然是要直接使用我们自己封装的条件变量,之前写的阻塞队列是不是用到了条件变量和锁,我们就用自己封装的条件变了和锁替换pthread库的。

namespace blockqueuemodule
{static const int gcap = 10;template<class T>class blockqueue{public:blockqueue(int cap = gcap) //给一个缺省值:_cap(cap),_pwait_num(0),_cwait_num(0){}bool isfull(){return q.size() == _cap;}void equeue(const T& in) //入队函数{//加锁lockguard lockg(_mutex);while (isfull()){cout << "生产者进入等待。。。" << endl;_pwait_num++;//加入生产者的等待队列_productor_cond.wait(_mutex);_pwait_num--;cout << "生产者被唤醒。。。" << endl;}q.push(in); //写入//解锁if (_cwait_num){cout << "叫醒消费者" << endl;_consumer_cond.notify();}}bool isempty(){return q.empty();}void pop(T* out) //出队函数{//加锁lockguard lockg(_mutex);while (isempty()){cout << "消费者进入等待。。。" << endl;_cwait_num++;//加入消费者的等待队列_consumer_cond.wait(_mutex);_cwait_num--;cout << "消费者被唤醒。。。" << endl;}*out = q.front();q.pop();if (_pwait_num){cout << "叫醒生产者" << endl;_productor_cond.notify();}}~blockqueue(){}private:queue<T> q; //保存数据的容器(队列), 临界资源int _cap;    //队列最大容量mutex _mutex;   //作为临界资源的锁cond _productor_cond;  //生产者条件变量cond _consumer_cond;   //消费者条件变量int _pwait_num;          //生产者等待队列内线程计数int _cwait_num;          //消费者等待队列内线程计数};};

注意,这里条件变量和锁用我们自己的,析构和构造就不用写了因为是内置类型还自己调用自己的,加锁和解锁用的RAII模式也是自动进行的(看不懂的看之前锁的封装有详细讲),放入等待队列,唤醒机制也是用的我们自己的。有的同学就会发现自动调用解锁的话,解锁不就在唤醒后面了吗,这里没有问题,当 pthread_cond_wait 被唤醒并尝试自动重新加锁时,如果此时锁已经被其他线程持有(即处于加锁状态),当前线程会阻塞,直到成功获取锁后才会返回。这是条件变量正常工作的重要机制,确保线程安全,当然先解锁也可以的。

使用阻塞队列传递任务

前面完成了对一些函数等的改造,但是queue里面的参数T不仅可以是整型这种数字呀,还可以任何可调用对象比如类呀,我们可以封装一个任务类,让阻塞队列传递任务。

比如我们再整一个执行excute内部方法的类作为任务类,然后通过生产者每次传递不同的x和y,交由消费者读取并执行excute方法完成读取分析,sleep(1)模拟任务处理时长。

#pragma once
#include<iostream>
#include<unistd.h>
using namespace std;namespace taskmodule
{class Task{public:Task(){}Task(int a, int b):x(a),y(b){}void excute(){sleep(1); result = x + y;}int X(){return x;}int Y(){return y;}int Result(){return result;}~Task(){}private:int x;int y;int result;};};

对于生产者来说,x,y采用随机数生成,然后随机数种子结合时间戳^进程pid的值进行生成,由于这两个都有很强的随机性和不重复性,所以保证每次运行时生成的随机数种子不同。

srand(time(nullptr) ^ getpid());

 

#include"blockqueue.hpp"
#include"task.hpp"
using namespace blockqueuemodule;
using namespace taskmodule;//消费---读取
void* consumer(void* args)
{blockqueue<Task>* bq = static_cast<blockqueue<Task>*>(args);while (true){sleep(1);Task t;  //task.hpp中无参构造的作用就体现了bq->pop(&t);t.excute();// 做处理printf("consumer, 处理了一个任务: %d + %d = %d\n", t.X(), t.Y(), t.Result());}}//生产---写入
void* productor(void* args)
{blockqueue<Task>* bq = static_cast<blockqueue<Task>*>(args);while (true){int x = rand() % 10 + 1; //[1, 10]int y = rand() % 10 + 1; //[1, 10]Task t(x, y);//写到bq中bq->equeue(t);printf("producter, 生产了一个数据: %d + %d = ?\n", t.X(), t.Y());}
}int main()
{srand(time(nullptr) ^ getpid());blockqueue<Task>* bq = new blockqueue<Task>(6);//单生产,单消费pthread_t c1, c2, p1, p2, p3;pthread_create(&c1, nullptr, consumer, bq);pthread_create(&p2, nullptr, consumer, bq);pthread_create(&p1, nullptr, productor, bq);pthread_create(&p2, nullptr, productor, bq);pthread_create(&p3, nullptr, productor, bq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);return 0;
}

使用回调函数实现任务处理

我们使用类的话其实结构扩展性特别强,但是不是很灵活,回调函数也是可调用对象呀,也可以放进队列中呀,而且使用回调函数我们想插入什么任务函数就插入什么任务函数,还不用大费周章的写个类。

using task_t = function<void()>;

 回调函数选择无返回值无参数的函数。

#include"blockqueue.hpp"
#include<functional>
using namespace blockqueuemodule;
using task_t = function<void()>;void test()
{cout << "haha test..." << endl;
}void hello()
{cout << "hehe hello..." << endl;
}
//消费---读取
void* consumer(void* args)
{blockqueue<task_t>* bq = static_cast<blockqueue<task_t>*>(args);while (true){sleep(1);task_t t;  //task.hpp中无参构造的作用就体现了bq->pop(&t);// 做处理t();//printf("consumer, 处理了一个任务: %d + %d = %d\n", t.X(), t.Y(), t.Result());printf("consumer, 处理了一个任务\n");}}//生产---写入
void* productor(void* args)
{blockqueue<task_t>* bq = static_cast<blockqueue<task_t>*>(args);while (true){//int x = rand() % 10 + 1; //[1, 10]//int y = rand() % 10 + 1; //[1, 10]//写到bq中bq->equeue(test);bq->equeue(hello);printf("producter, 生产了一个任务\n");}
}int main()
{//srand(time(nullptr) ^ getpid());blockqueue<task_t>* bq = new blockqueue<task_t>(6);//单生产,单消费pthread_t c1, c2, p1, p2, p3;pthread_create(&c1, nullptr, consumer, bq);pthread_create(&p2, nullptr, consumer, bq);pthread_create(&p1, nullptr, productor, bq);pthread_create(&p2, nullptr, productor, bq);pthread_create(&p3, nullptr, productor, bq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);return 0;
}

生产者每次都向队列中投入两个我们构造的任务函数,消费者通过回调函数获取任务并执行。这样消费者就可以执行不同的任务了。

总结

生产者-消费者模型中,生产者的生产资料来自自己造的或者外部输入,消费者不仅仅读取数据,还需要做处理,分析该模型的效率不仅仅只有生产者写入缓存和消费者读取缓存这两个串行执行的效率,更多的整体效率应该放在生产者获取数据的过程和消费者处理数据的过程,这两个并行的过程才是耗时最多的。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词