欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 国际 > 关于Hipe并发库中动态线程库DynamicThreadPond的一点解读(一)

关于Hipe并发库中动态线程库DynamicThreadPond的一点解读(一)

2025/9/8 10:51:05 来源:https://blog.csdn.net/qq_41596730/article/details/140693164  浏览:    关键词:关于Hipe并发库中动态线程库DynamicThreadPond的一点解读(一)

文章目录

  • 前提
  • `DynamicThreadPond`初始化
  • 自定义任务类型类 `SafeTask`
  • 添加到`shared_tq`中的任务如何执行
  • 总结

前提

最近在学习多线程编程的知识,之前只是简单地了解过多线程的一些基本概念,并没有实操过。这次在网络上找到了一个有关线程池的库——Hipe,仔细地学习下。

这篇博客主要对Hipe中的动态线程池中的一些API的用法,写法,以及整体架构做一个简单的解读,如有不当之外,还请大家多多见谅。

一些碎碎念的东西:之前的博客大部分是记录自己学习中遇到的问题和学习的知识(涉及的知识面较小),很少有对一个完整的工程或者涵盖一个大的知识进行记录。原因在于:一方面怕自己能力不够,误导读者;另一方面则是对一个较为完整的工程记录的话,会花费较多的时间。思来想去,还是要将自己遇到的涉及的知识面广的点记录下来,虽然耗费的时间和精力会更多,但是这样可以督促自己深入去把握这个点,收获会更大,理解会更深。

DynamicThreadPond初始化

DynamicThreadPond初始化比较简单,大概逻辑就是:根据用户的指定的线程数来创建线程,然后将其添加到线程池中。

// dynamic thread pondstd::map<std::thread::id, std::thread> pond;/*** @brief construct DynamicThreadPond* @param tnumb initial thread number*/explicit DynamicThreadPond(int tnumb = 0) {addThreads(tnumb);}/*** @brief add threads* @param tnumb thread number* The pond will expand through creating new thread.*/void addThreads(int tnumb = 1) {assert(tnumb >= 0);expect_tnumb += tnumb;HipeLockGuard lock(shared_locker);while (tnumb--) {std::thread t(&DynamicThreadPond::worker, this);  // 创建线程pond.emplace(std::make_pair<std::thread::id, std::thread>(t.get_id(), std::move(t)));  // 添加到线程池中}}

自定义任务类型类 SafeTask

线程池通过重用线程来减少创建和销毁线程的开销。当有任务时,线程池会从池中取出一个空闲线程来处理任务,而不是每次都创建新线程。

在这里,任务可以是一个函数,lambda表达式,函数对象或任何可以调用的对象。一般来说,在使用线程波时,任务通常被封装成可调用的对象,然后添加到任务队列里,等待线程池里的线程执行。

SafeTask就是Hipe框架中自定义的任务类型,也就是说所有的任务会被封装成SafeTask类型的对象。 被封装为SafeTask类型的任务会添加到任务队列里,相关代码如下:

using HipeTask = util::SafeTask;
std::queue<HipeTask> shared_tq = {}; // 线程池的任务队列/*** @brief submit task* @param foo An runnable object* "Runnable&&" is a special type, we call it "universal reference" in template function,* it can accept lvalue, rvalue, and const lvalue, and it can keep the original type.*/
template <typename Runnable>
void submit(Runnable&& foo) {{//Need to add locker to protect the shared task queue, because it is consumed by multiple threads.HipeLockGuard lock(shared_locker);// push a task into the queueshared_tq.emplace(std::forward<Runnable>(foo));  // std::forward can keep the original type++total_tasks;}awake_cv.notify_one();
}

submit函数在文档中的使用方法如下:

void foo1() {stream.print("call foo1");
}pond.submit([] { stream.print("hello world"); }); // 提交 匿名函数任务
pond.submit(foo1); // 提交普通函数 任务

综上,当使用submit函数提交任务时,线程池会将任务包装为SafeTask对象,然后存到shared_tq这个任务队列中。


所以接下来关注如何将一个函数对象包装为SafeTask对象。

我们主要看下SafeTask的构造函数:

class SafeTask {/***定义一个基类,用于保存不同类型的可运行对象,它是一个纯虚函数类*/struct BaseExec {virtual void call() = 0;virtual ~BaseExec() = default;};/*** 定义一个派生类,用于保存可运行对象,该类继承自BaseExec,将运行对象 foo 保存在内部* 同时实现了call()函数,用于调用保存的可运行对象* is_reference_wrappern 用于判断是否是引用包装器*/template <typename F, typename T = typename std::decay<F>::type>struct GenericExec : BaseExec {T foo;GenericExec(F&& f): foo(std::forward<F>(f)) {static_assert(!is_reference_wrapper<F>::value,"[HipeError]: Use 'reference_wrapper' to save temporary variable is dangerous");}~GenericExec() override = default;void call() override {foo();  // call the runnable object}};public:SafeTask() = default;SafeTask(SafeTask&& other) = default;SafeTask(SafeTask&) = delete;SafeTask(const SafeTask&) = delete;SafeTask& operator=(const SafeTask&) = delete;~SafeTask() = default;// construct a task// is_runnable<F>::value 用于判断是否是可运行对象,使用了SFINAE技术template <typename F, typename = typename std::enable_if<is_runnable<F>::value>::type>SafeTask(F&& foo): exe(new GenericExec<F>(std::forward<F>(foo))) {}// reset the tasktemplate <typename F, typename = typename std::enable_if<is_runnable<F>::value>::type>void reset(F&& foo) {exe.reset(new GenericExec<F>(std::forward<F>(foo)));}// the task was setbool is_set() {return static_cast<bool>(exe);}// override "="SafeTask& operator=(SafeTask&& tmp) {exe.reset(tmp.exe.release());return *this;}// runnable// 最终通过 () 运算符重载,调用保存的可运行对象void operator()() {exe->call();}private:// 保存可运行对象的指针,多态std::unique_ptr<BaseExec> exe = nullptr;
};

上面就是SafeTask的大致理解。

添加到shared_tq中的任务如何执行

将任务添加到队列后,接下来我们关心的是添加的这些任务如何被执行。
还是在添加任务的这个函数里

    template <typename Runnable>void submit(Runnable&& foo) {{// Need to add locker to protect the shared task queue, because it is consumed by multiple threads.HipeLockGuard lock(shared_locker);// push a task into the task queueshared_tq.emplace(std::forward<Runnable>(foo));  // std::forward can keep the original type++total_tasks;}awake_cv.notify_one();}

当添加完任务后,它会调用条件变量awake_cvnotify_one()。这就涉汲到在多线程中条件变量的知识了。在下面我们简单描述下条件变量的使用:


条件变量的基本用法包括等待和通知操作,具体步骤如下:

  1. 创建一个条件变量对象和一个互斥锁对象。
  2. 使用互斥锁保护共享数据。
  3. 在一个线程中等待条件变量(等待时会释放锁,条件满足后会重新获取锁)。
  4. 在另一个线程中修改共享数据并通知条件变量。

template <typename Runnable>
void submit(Runnable&& foo) {// ...awake_cv.notify_one(); // 通知一个正在等待的线程
}void worker() {// ...do {HipeUniqGuard locker(shared_locker);awake_cv.wait(locker, [this] { return !shared_tq.empty() || shrink_numb > 0; });  // 该线程拿到锁后,判断条件,此时条件反回True// ...

所以从上面可知,当任务添加到队列后,就会唤醒一个线程,继续看worker()函数,这也是线程池中的每一个线程的工作函数。

// working threads' default loop
void worker() {// task containerHipeTask task;// ...// ...do {HipeUniqGuard locker(shared_locker);awake_cv.wait(locker, [this] { return !shared_tq.empty() || shrink_numb > 0; });// ...// ...task = std::move(shared_tq.front()); // 从队列中取出一个任务shared_tq.pop();locker.unlock(); // 解锁,在这里,任务队列的数据是需要锁的tasks_loaded++;util::invoke(task);  // 执行提交的任务--total_tasks;  // ...// ...} while (true);// ...
}
};

上面的代码就比较明白了,然后深入到util::invoke()函数中去看看。invoke()函数的定义如下:

template <typename F, typename... Args>
void invoke(F&& call, Args&&... args) {static_assert(is_runnable<F, Args...>::value, "[HipeError]: Invoke non-runnable object !");call(std::forward<Args>(args)...);  // call的类型为`SafeTask` + () 运算符重载
}class SafeTask {// ...// ...// runnable// 最终通过 () 运算符重载,调用保存的可运行对象void operator()() {exe->call();}
private:// 保存可运行对象的指针,多态std::unique_ptr<BaseExec> exe = nullptr;
};

这样从如何添加任务到如何执行任务的这个流程就明白了。

总结

这一次我们简单看了下DynamicThreadPond如何初始化,如何添加任务,如何执行任务,用到了队列,锁,条件变量,函数模板等相关知识。但是并没有体现动态这一点,下一节我们分析下为什么它是动态线程池

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词