目录
一、生产者消费者模型
1. 生产者消费者模型是什么?
2. 为什么使用生产者消费者模型?
3. 生产者消费者模型的特点(321原则)
🌵3种关系
🌵2种角色
🌵1个交易场所
二、基于BlockingQueue的生产者消费者模型
1. 什么是BlockingQueue?
2. C++实现阻塞队列
3. 生产者和消费者的实现
4. 生产者和消费者速度不一致的情况
5. 基于计算任务的生产者消费者模型
一、生产者消费者模型
1. 生产者消费者模型是什么?
生产者消费者模型是一种经典的多线程设计模式,用于解决生产者和消费者之间的数据交换问题。在这个模型中,生产者负责生成数据,而消费者负责处理数据。两者通过一个共享的缓冲区(通常是阻塞队列)进行通信,而不是直接交互。这种间接通信方式有效地降低了生产者和消费者之间的耦合度。
2. 为什么使用生产者消费者模型?
-
解耦:生产者和消费者不需要直接通信,它们通过共享缓冲区进行数据交换,降低了模块间的依赖性。
-
支持并发:生产者和消费者可以同时运行,提高了系统的并发性能。
-
支持忙闲不均:生产者和消费者的处理速度可以不同,通过缓冲区来平衡两者的处理能力。
3. 生产者消费者模型的特点(321原则)
🌵3种关系
-
生产者和生产者(互斥关系):多个生产者不能同时向缓冲区写入数据。
-
消费者和消费者(互斥关系):多个消费者不能同时从缓冲区读取数据。
-
生产者和消费者(互斥关系、同步关系):生产者和消费者不能同时访问缓冲区,且需要协调访问顺序。
🌵2种角色
-
生产者:负责生成数据并放入缓冲区。
-
消费者:负责从缓冲区中取出数据并处理。
🌵1个交易场所
-
缓冲区:通常是内存中的一段区域,用于存储生产者生成的数据。
二、基于BlockingQueue的生产者消费者模型
1. 什么是BlockingQueue?
阻塞队列(Blocking Queue)是一种特殊的队列,当队列为空时,从队列获取元素的操作会被阻塞;当队列满时,向队列放入元素的操作会被阻塞。阻塞队列在多线程编程中非常有用,可以简化生产者消费者模型的实现。
2. C++实现阻塞队列
以下是一个简单的阻塞队列实现,使用C++的std::queue
和POSIX线程库(pthread
)。
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>template <class T>
class BlockQueue
{
private:bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}public:BlockQueue(int cap = 5) : _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}// 生产者调用:向阻塞队列插入数据void Push(const T &data){pthread_mutex_lock(&_mutex);while (IsFull()){// 队列满,生产者等待pthread_cond_wait(&_full, &_mutex);}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); // 唤醒消费者}// 消费者调用:从阻塞队列获取数据void Pop(T &data){pthread_mutex_lock(&_mutex);while (IsEmpty()){// 队列空,消费者等待pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); // 唤醒生产者}private:std::queue<T> _q; // 阻塞队列int _cap; // 队列容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _full; // 队列满时的条件变量pthread_cond_t _empty; // 队列空时的条件变量
};
代码说明:
-
互斥锁:保护共享资源(阻塞队列)的访问。
-
条件变量:
_full
用于通知生产者队列已满,_empty
用于通知消费者队列为空。 -
Push和Pop方法:分别用于生产者和消费者的操作,确保线程安全。
3. 生产者和消费者的实现
生产者线程
void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1); // 模拟生产时间int data = rand() % 100 + 1; // 生成随机数据bq->Push(data); // 将数据放入阻塞队列std::cout << "Producer: " << data << std::endl;}return nullptr;
}
消费者线程
void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1); // 模拟消费时间int data = 0;bq->Pop(data); // 从阻塞队列中取出数据std::cout << "Consumer: " << data << std::endl;}return nullptr;
}
主函数
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;// 创建生产者和消费者线程pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);// 等待线程结束pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}
4. 生产者和消费者速度不一致的情况
生产者快,消费者慢
如果生产者生成数据的速度比消费者消费数据的速度快,阻塞队列会很快被填满。此时,生产者会被阻塞,直到消费者消费数据后释放空间。
void *Producer(void *arg)
{BlockQueue<int> *bq = (BlockQueue<int> *)arg;// 生产者不断进行生产while (true){int data = rand() % 100 + 1;bq->Push(data); // 生产数据std::cout << "Producer: " << data << std::endl;}
}
void *Consumer(void *arg)
{BlockQueue<int> *bq = (BlockQueue<int> *)arg;// 消费者不断进行消费while (true){sleep(1);int data = 0;bq->Pop(data); // 消费数据std::cout << "Consumer: " << data << std::endl;}
}
生产者慢,消费者快
如果消费者消费数据的速度比生产者生成数据的速度快,阻塞队列会很快变空。此时,消费者会被阻塞,直到生产者生成新的数据。
void *Producer(void *arg)
{BlockQueue<int> *bq = (BlockQueue<int> *)arg;// 生产者不断进行生产while (true){sleep(1);int data = rand() % 100 + 1;bq->Push(data); // 生产数据std::cout << "Producer: " << data << std::endl;}
}
void *Consumer(void *arg)
{BlockQueue<int> *bq = (BlockQueue<int> *)arg;// 消费者不断进行消费while (true){int data = 0;bq->Pop(data); // 消费数据std::cout << "Consumer: " << data << std::endl;}
}
5. 基于计算任务的生产者消费者模型
定义任务类
#pragma onceclass Task
{
public:Task(int x = 0, int y = 0, char op = '+') : _x(x), _y(y), _op(op) {}void Run(){int result = 0;switch (_op){case '+':result = _x + _y;break;case '-':result = _x - _y;break;case '*':result = _x * _y;break;case '/':if (_y == 0){std::cout << "Warning: division by zero!" << std::endl;result = -1;}else{result = _x / _y;}break;default:std::cout << "Error: invalid operation!" << std::endl;}std::cout << _x << " " << _op << " " << _y << " = " << result << std::endl;}private:int _x, _y;char _op;
};
生产者和消费者线程
void *Producer(void *arg)
{BlockQueue<Task> *bq = (BlockQueue<Task> *)arg;const char *ops = "+-*/";while (true){int x = rand() % 100;int y = rand() % 100 + 1; // 避免除以零char op = ops[rand() % 4];Task t(x, y, op);bq->Push(t);std::cout << "Produced task: " << x << " " << op << " " << y << std::endl;}return nullptr;
}void *Consumer(void *arg)
{BlockQueue<Task> *bq = (BlockQueue<Task> *)arg;while (true){Task t;bq->Pop(t);t.Run();}return nullptr;
}