欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > Linux生产者消费者模型

Linux生产者消费者模型

2025/5/22 21:51:25 来源:https://blog.csdn.net/m0_74447750/article/details/146461154  浏览:    关键词:Linux生产者消费者模型

Linux生产者消费者模型

  • Linux生产者消费者模型详解
    • 生产者消费者模型
      • 生产者消费者模型的概念
      • 生产者消费者模型的特点
      • 生产者消费者模型优点
    • 基于BlockingQueue的生产者消费者模型
      • 基于阻塞队列的生产者消费者模型
      • 模拟实现基于阻塞队列的生产消费模型
        • 基础实现
        • 生产者消费者步调调整
        • 条件唤醒优化
        • 基于计算任务的扩展
    • 总结


Linux生产者消费者模型详解


生产者消费者模型

生产者消费者模型的概念

生产者消费者模型通过一个容器解决生产者与消费者的强耦合问题。

  • 通信方式:生产者不直接与消费者交互,而是将数据放入容器;消费者从容器取数据。
  • 容器作用:缓冲区,解耦生产者与消费者,平衡双方处理能力。

生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的经典场景,具有以下特点:

  1. 三种关系
    • 生产者与生产者:互斥(竞争容器访问)。
    • 消费者与消费者:互斥(竞争容器访问)。
    • 生产者与消费者:互斥(共享容器)+同步(生产消费顺序)。
  2. 两种角色:生产者与消费者(线程或进程)。
  3. 一个交易场所:内存缓冲区(如队列)。

互斥原因:容器是临界资源,需用互斥锁保护,多线程竞争访问。
同步原因

  • 容器满时,生产者需等待,避免生产失败。
  • 容器空时,消费者需等待,避免消费失败。
  • 同步确保有序访问,防止饥饿,提高效率。

注意:互斥保证数据正确性,同步实现线程协作。

生产者消费者模型优点

  1. 解耦:生产者与消费者独立运行,通过容器间接交互。
  2. 支持并发:生产者生产时,消费者可同时消费。
  3. 支持忙闲不均:容器缓冲数据,平衡处理速度差异。

对比函数调用(紧耦合),生产者消费者模型是松耦合设计,生产者无需等待消费者处理。


基于BlockingQueue的生产者消费者模型

基于阻塞队列的生产者消费者模型

在多线程编程中,**阻塞队列(Blocking Queue)**是实现生产者消费者模型的常用数据结构。

  • 与普通队列的区别
    • 队列空时,取元素操作阻塞,直到有数据。
    • 队列满时,放元素操作阻塞,直到有空间。
  • 应用场景:类似管道通信。

模拟实现基于阻塞队列的生产消费模型

基础实现

以单生产者、单消费者为例,使用C++ queue 实现阻塞队列:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>#define NUM 5template<class T>
class BlockQueue {
private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }
public:BlockQueue(int cap = NUM) : _cap(cap) {pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue() {pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull()) {pthread_cond_wait(&_full, &_mutex); // 队列满,等待}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); // 唤醒消费者}void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty()) {pthread_cond_wait(&_empty, &_mutex); // 队列空,等待}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); // 唤醒生产者}
private:std::queue<T> _q; // 阻塞队列int _cap; // 容量上限pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _full; // 满条件变量pthread_cond_t _empty; // 空条件变量
};

main.cpp

#include "BlockQueue.hpp"
#include <unistd.h>void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}

说明

  • 单生产者单消费者:无需维护生产者间或消费者间的互斥。
  • 互斥_mutex 保护队列。
  • 同步_full_empty 条件变量控制生产消费顺序。
  • 条件判断:用 while 防止伪唤醒。
  • 运行结果:生产者与消费者步调一致,每秒交替生产消费。
生产者消费者步调调整
  1. 生产快,消费慢

    void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;}
    }
    void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;}
    }
    
    • 生产者快速填满队列后等待,消费者消费一个后唤醒生产者,后续步调一致。
  2. 生产慢,消费快

    void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;}
    }
    void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;}
    }
    
    • 消费者初始等待生产者生产,消费后继续等待,步调随生产者。
条件唤醒优化

调整唤醒条件,例如队列数据量超一半时唤醒消费者,小于一半时唤醒生产者:

void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull()) {pthread_cond_wait(&_full, &_mutex);}_q.push(data);if (_q.size() >= _cap / 2) {pthread_cond_signal(&_empty); // 超一半唤醒消费者}pthread_mutex_unlock(&_mutex);
}
void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty()) {pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();if (_q.size() <= _cap / 2) {pthread_cond_signal(&_full); // 少于一半唤醒生产者}pthread_mutex_unlock(&_mutex);
}
  • 效果:生产者快速填满队列后等待,消费者消费至一半以下才唤醒生产者。
基于计算任务的扩展

将队列存储类型改为任务类,扩展功能:

Task.hpp

#pragma once
#include <iostream>class Task {
public:Task(int x = 0, int y = 0, char op = 0) : _x(x), _y(y), _op(op) {}void Run() {int result = 0;switch (_op) {case '+': result = _x + _y; break;case '-': result = _x - _y; break;case '*': result = _x * _y; break;case '/': if (_y == 0) { std::cout << "Warning: div zero!" << std::endl; result = -1; }else { result = _x / _y; } break;case '%': if (_y == 0) { std::cout << "Warning: mod zero!" << std::endl; result = -1; }else { result = _x % _y; } break;default: std::cout << "error operation!" << std::endl; break;}std::cout << _x << " " << _op << " " << _y << "=" << result << std::endl;}
private:int _x, _y;char _op;
};

main.cpp

#include "BlockQueue.hpp"
#include "Task.hpp"void* Producer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;const char* ops = "+-*/%";while (true) {int x = rand() % 100;int y = rand() % 100;char op = ops[rand() % 5];Task t(x, y, op);bq->Push(t);std::cout << "Producer task done" << std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;while (true) {sleep(1);Task t;bq->Pop(t);t.Run();}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<Task>* bq = new BlockQueue<Task>;pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}
  • 功能:生产者生成计算任务,消费者执行计算并输出结果。
  • 扩展性:通过定义不同 Task 类实现多样化任务处理。

总结

  • 模型核心:通过容器解耦生产者与消费者,支持并发与忙闲不均。
  • 实现关键:阻塞队列结合互斥锁与条件变量,确保互斥与同步。
  • 灵活性:可调整步调、唤醒条件,或扩展为复杂任务处理。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词