欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > C++ 简单线程池实现

C++ 简单线程池实现

2025/9/27 15:08:58 来源:https://blog.csdn.net/qq_51578257/article/details/147616784  浏览:    关键词:C++ 简单线程池实现

实现代码 

#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>class ThreadPool {
public:ThreadPool(size_t threads) : stop(false) {for(size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {for(;;) {std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});}}template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;}~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for(std::thread &worker: workers)worker.join();}private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop;
};

使用示例

#include <iostream>
#include <chrono>int main() {ThreadPool pool(4);// 提交多个任务到线程池std::vector<std::future<int>> results;for(int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "hello " << i << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "world " << i << std::endl;return i*i;}));}// 获取结果for(auto && result: results)std::cout << result.get() << ' ';std::cout << std::endl;return 0;
}

解析

ThreadPool 类定义

class ThreadPool {
public:ThreadPool(size_t threads);  // 构造函数,指定线程数量~ThreadPool();               // 析构函数template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;private:std::vector<std::thread> workers;       // 工作线程集合std::queue<std::function<void()>> tasks; // 任务队列std::mutex queue_mutex;                 // 任务队列互斥锁std::condition_variable condition;      // 条件变量bool stop;                              // 停止标志
};

构造函数解析

ThreadPool(size_t threads) : stop(false) {for(size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {// 线程工作函数for(;;) {std::function<void()> task;{// 获取队列锁std::unique_lock<std::mutex> lock(this->queue_mutex);// 等待条件满足:停止或任务队列非空this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });// 如果停止且任务为空,线程退出if(this->stop && this->tasks.empty())return;// 获取任务task = std::move(this->tasks.front());this->tasks.pop();}// 执行任务(在锁外执行,避免锁持有时间过长)task();}});}
}
  1. stop(false) - 初始化停止标志为false

  2. workers.emplace_back - 创建并启动工作线程

  3. for(;;) - 线程无限循环,等待任务

  4. std::unique_lock<std::mutex> - 获取队列锁

  5. condition.wait - 等待条件变量通知,防止忙等待

       条件:stop || !tasks.empty()(停止或有任务)
  6. 检查是否应该退出线程

  7. 从队列获取任务并移出队列

  8. 在锁外执行任务

enqueue 方法解析

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {// 获取返回类型using return_type = typename std::result_of<F(Args...)>::type;// 创建packaged_task包装可调用对象auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 获取future以便获取结果std::future<return_type> res = task->get_future();{// 获取队列锁std::unique_lock<std::mutex> lock(queue_mutex);// 如果线程池已停止,抛出异常if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");// 将任务添加到队列tasks.emplace([task](){ (*task)(); });}// 通知一个等待线程有新任务condition.notify_one();return res;
}
  1. 模板参数:

    • F - 可调用对象类型

    • Args - 参数类型包

  2. 返回类型推导:

    • std::future<typename std::result_of<F(Args...)>::type>

  3. std::packaged_task - 包装可调用对象,可以获取future

  4. std::bind - 绑定参数

  5. std::forward - 完美转发参数

  6. task->get_future() - 获取与任务关联的future

  7. 锁保护下的队列操作

  8. condition.notify_one() - 通知一个等待线程

析构函数解析

~ThreadPool() {{// 获取锁并设置停止标志std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}// 通知所有线程condition.notify_all();// 等待所有线程完成for(std::thread &worker: workers)worker.join();
}
  1. 设置停止标志stop = true

  2. condition.notify_all() - 唤醒所有等待线程

  3. worker.join() - 等待所有线程结束

动态调整线程数量

void resize(size_t new_size) {if (new_size < workers.size()) {// 减少线程数量{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread &worker : workers)worker.join();workers.clear();stop = false;for (size_t i = 0; i < new_size; ++i) {workers.emplace_back([this] {// 线程工作函数});}} else if (new_size > workers.size()) {// 增加线程数量for (size_t i = workers.size(); i < new_size; ++i) {workers.emplace_back([this] {// 线程工作函数});}}
}

 任务优先级

#include <queue>// 修改任务队列定义
struct Task {std::function<void()> func;int priority;bool operator<(const Task& other) const {return priority < other.priority; // 优先级高的先执行}
};std::priority_queue<Task> tasks;// 修改enqueue方法
template<class F, class... Args>
auto enqueue(int priority, F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {// ... 其他代码不变tasks.emplace(Task{[task](){ (*task)(); }, priority});// ...
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词