欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 养生 > Linux操作系统7- 线程同步与互斥5(POSIX条件变量生产者消费者模型的进一步使用)

Linux操作系统7- 线程同步与互斥5(POSIX条件变量生产者消费者模型的进一步使用)

2025/7/23 3:12:46 来源:https://blog.csdn.net/yzcllzx/article/details/146457409  浏览:    关键词:Linux操作系统7- 线程同步与互斥5(POSIX条件变量生产者消费者模型的进一步使用)

上篇文章:Linux操作系统7- 线程同步与互斥4(基于POSIX条件变量的生产者消费者模型)-CSDN博客

本篇代码仓库:

支持处理简单任务的生产者消费者模型代码

生产者-消费者-保存者三线程两队列模型

多生产多消费的生产者消费者模型

        进一步使用生产者消费者模型不需要修改Queue.hpp

#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的最大容量template <class T>
class BlockQueue
{
public:BlockQueue(int maxnum = gnum) : _maxnum(maxnum){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}// 生产者生产数据void push(const T &in){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否满足生产while (is_full()){// 数据满了生产者等待消费者消费pthread_cond_wait(&_pcond, &_mtx);}// 生产数据_queue.push(in);// 队列不为空,通知消费者消费pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_mtx);}// 消费这消费数据,通过输入输出型参数获取数据void pop(T *out){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否可以消费数据while (is_empty()){// 等待生产者生产数据pthread_cond_wait(&_ccond, &_mtx);}// 开始消费数据*out = _queue.front();_queue.pop();// 队列不满,通知生产者生产数据pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_mtx);}private:bool is_empty(){return _queue.empty();}bool is_full(){return _queue.size() == _maxnum;}private:std::queue<T> _queue;  // 阻塞队列size_t _maxnum;        // 队列最大容量pthread_mutex_t _mtx;  // 互斥锁pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};

目录

一. 生存消费模型处理简单任务

1.1 需求分析

1.2 Task.hpp

1.3 Main.cpp 

1.4 测试

二. 生产者-消费者-保存者-三线程两队列

2.1 Task.hpp 

2.2 Main.cpp

2.3 测试

三. 多生产多消费模型 

四. 多生产多消费的意义


一. 生存消费模型处理简单任务

1.1 需求分析

        上篇文章中,我们使用了生存者消费者模型实现了生产者产生随机数据放入BlockQueue,而消费者从BlockQueue中拿取数据输出。

        现在想让生产者消费者模型处理一个简单的任务。比如:生产者产生两个随机数据和运算符,而消费者拿取数据获取计算结果并输出。

        此时就需要一个Task.hpp来构造任务。

1.2 Task.hpp

        创建一个结构体,内部包含计算数据,计算操作符,以及()重载

代码框架如下:

#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:
private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};

        具体实现如下:需要增加构造函数,析构函数,重载(),回调函数等。

#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:CalTask() {}CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func) {}~CalTask() {}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = %d", _x, _op, _y, result);return buffer;}private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};// 计算函数
int my_math(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero" << std::endl;return -1;}else{result = x / y;}break;}case '%':if (y == 0){std::cerr << "moved zero" << std::endl;return -1;}else{result = x % y;}break;default:break;}return result;
}

1.3 Main.cpp 

        根据分析,生产者生产数据然后交给消费者计算并输出。

#include <iostream>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"const std::string OP = "+-*/%";
void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *bq = static_cast<BlockQueue<CalTask> *>(args);while (true){int x = rand() % 100;int y = rand() % 100;char op = OP[rand() % OP.size()];// 打印日志printf("生产者生产的数据:%d %c %d 并交给消费者计算\n", x, op, y);CalTask t(x, y, op, my_math);bq->push(t);usleep(500000);}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *bq = static_cast<BlockQueue<CalTask> *>(args);while (true){CalTask t;bq->pop(&t);std::cout << "消费者获取数据并计算:" << t() << std::endl;}return nullptr;
}int main()
{srand(time(0) ^ getpid() ^ rand());// 定义生产消费线程与阻塞队列pthread_t p;pthread_t c;BlockQueue<CalTask> *bq = new BlockQueue<CalTask>();pthread_create(&p, nullptr, producer, (void *)bq);pthread_create(&c, nullptr, consumer, (void *)bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;bq = nullptr;return 0;
}

1.4 测试

测试结果如下:

        可以看到,通过生产者消费者模型。消费者成功的获取了来自生产传递的数据,并且计算这个结果。

        如果有需要的话,可以将更复杂的任务交给生产者消费者模型。比如网络/本地的IO操作,时间复杂度较高的运算。 

二. 生产者-消费者-保存者-三线程两队列

        能否实现一个生产者生产数据交给消费者处理,消费者处理完成之后再交给保存者将结果保存在文件中?

        生产线程生产任务传递给任务队列

        消费线程从任务队列读取任务,消费完成任务之后将结果传递给保存队列

        保存线程从保存队列读取数据并写入文件中

        同时需要保证生产者消费者之间的同步,消费者与保存者之间的同步。

2.1 Task.hpp 

        需要给保存者一个任务用于保存,以解耦主函数和任务处理。

#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:CalTask() {}CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func) {}~CalTask() {}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = %d", _x, _op, _y, result);return buffer;}private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};class SaveTask
{typedef void (*func_t)(const std::string &);public:SaveTask(std::string _result = "", func_t func = Save): _callback(func) {}void operator()(){_callback(_result);}private:std::string _result;func_t _callback;
};// 计算函数
int my_math(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero" << std::endl;return -1;}else{result = x / y;}break;}case '%':if (y == 0){std::cerr << "moved zero" << std::endl;return -1;}else{result = x % y;}break;default:break;}return result;
}void Save(const std::string &result)
{const std::string task_pwd = "./result.txt";FILE *fp = fopen(task_pwd.data(), "a"); // 需要追加写入if (fp == nullptr){std::cerr << "fopen error" << std::endl;}fputs(result.c_str(), fp);fclose(fp);fp = nullptr;
}

2.2 Main.cpp

        为了能够让消费者同时访问两个队列,需要一个结构体能够存储两个队列。

在BlockQueue.hpp中新增下面的代码即可

template <class C, class S>
struct BlockQueues
{BlockQueue<C> *_c_bq;BlockQueue<S> *_s_bq;
};

        主函数中需要新增一个保存者函数,用于保存者线程拿取数据并保存于文件中。同时消费者也需要新增一段将数据传递给保存者的逻辑 。

#include <iostream>
#include <memory>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"const std::string OP = "+-*/%";
void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *cal_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_c_bq;while (true){int x = rand() % 100;int y = rand() % 100;char op = OP[rand() % OP.size()];// 打印日志printf("生产者生产的数据:%d %c %d 并交给消费者计算\n", x, op, y);CalTask ct(x, y, op, my_math);cal_bq->push(ct);sleep(1);}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 生产消费阻塞队列,消费保存阻塞队列BlockQueue<CalTask> *cal_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_c_bq;BlockQueue<SaveTask> *save_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_s_bq;while (true){// 获取任务计算CalTask ct;cal_bq->pop(&ct);std::string result = ct();std::cout << "消费者获取数据并计算:" << result << std::endl;// 将任务传递给保存者SaveTask st(result, Save);std::cout << "消费者获取传递计算结果给保存者:" << result << std::endl;save_bq->push(st);}return nullptr;
}void *saver(void *args)
{// 获取交易场所 - 消费保存阻塞队列BlockQueue<SaveTask> *save_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_s_bq;while (true){SaveTask st;save_bq->pop(&st);st();std::cout << "保存者成功读取数据并保存在文件中:" << std::endl;}return nullptr;
}int main()
{srand(time(0) ^ getpid() ^ rand());// 定义消费生产保存线程,与两个阻塞队列BlockQueues<CalTask, SaveTask> *bqs = new BlockQueues<CalTask, SaveTask>();bqs->_c_bq = new BlockQueue<CalTask>;bqs->_s_bq = new BlockQueue<SaveTask>;pthread_t p;pthread_t c;pthread_t s;pthread_create(&p, nullptr, producer, (void *)bqs);pthread_create(&c, nullptr, consumer, (void *)bqs);pthread_create(&s, nullptr, saver, (void *)bqs);pthread_join(p, nullptr);pthread_join(c, nullptr);pthread_join(s, nullptr);delete bqs->_c_bq;delete bqs->_s_bq;return 0;
}

2.3 测试

三. 多生产多消费模型 

        只需要更改主函数即可实现多生产多消费模型

int main()
{srand((unsigned int)time(0) ^ getpid());// 建立任务队列和保存队列BlockQueues<CalTask, SaveTask> *bqs = new BlockQueues<CalTask, SaveTask>;bqs->_c_bq = new BlockQueue<CalTask>;bqs->_s_bq = new BlockQueue<SaveTask>;pthread_t c[3], p[2], s;pthread_create(c, nullptr, consumer, (void *)bqs);pthread_create(c + 1, nullptr, consumer, (void *)bqs);pthread_create(c + 2, nullptr, consumer, (void *)bqs);pthread_create(p, nullptr, producer, (void *)bqs);pthread_create(p + 1, nullptr, producer, (void *)bqs);pthread_create(&s, nullptr, saver, (void *)bqs);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(c[2], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(s, nullptr);delete bqs->_c_bq;delete bqs->_s_bq;return 0;
}

运行结果如下:

 

四. 多生产多消费的意义

        即便有多个生产者,多个消费者。但是锁只有一个,所以同一时刻只能有一个线程在临界区中执行代码。那么多生产多消费有什么意义?

        1 当生产者A将数据传递给队列后,产生新的数据非常耗时间,此时其他生产者可以获取锁并投放数据,而生产者A可以同时产生数据

        2 当消费者拿到一个数据进行消费的时候,其他消费者仍可以从队列中拿取新数据进行消费。而不需要等待消费者A消费完了其他消费者才去消费

        3 即可以让生产者线程生产之前,消费者线程消费之后。让线程并发执行(而不是提高存取,拿取数据的效率)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com