文章目录
- 异步工作器
- 设计
- 实现
异步工作器
设计
异步工作是基于之前的双缓冲区的思想,异步工作器的工作就是对处理缓冲区中的数据进行数据处理
如果缓冲区为空,则交换缓冲区
他管理的成员就是这两个缓冲区,需要用互斥锁保证线程安全,条件变量来保证生产与消费
但其实异步工作器本身并不知道应该如何对数据进行处理,他只是调用外部传入的回调函数
实现
/*实现异步工作器
*/
#pragma once
#include "buffer.hpp"
#include <mutex>
#include <condition_variable>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>namespace Xulog
{using Functor = std::function<void(Buffer &)>;enum class AsyncType{ASYNC_SAFE, // 缓冲区满则阻塞ASYNC_UNSAFE // 不考虑资源,无限扩容,性能测试};class AsyncLooper{public:using ptr = std::shared_ptr<AsyncLooper>;AsyncLooper(const Functor &func, AsyncType asynctype = AsyncType::ASYNC_SAFE): _stop(false), _thread(std::thread(&AsyncLooper::threadEntry, this)), _callBack(func), _looper_type(asynctype){}~AsyncLooper(){stop();_cond_con.notify_all(); // 唤醒所有工作线程_thread.join(); // 等待线程}void stop(){_stop = true;_cond_con.notify_all(); // 唤醒所有的工作线程}void push(const char *data, size_t len){std::unique_lock<std::mutex> lock(_mutex);// 条件变量控制,若缓冲区剩余大于等于数据长度,则返回真if (_looper_type == AsyncType::ASYNC_SAFE){_cond_pro.wait(lock, [&](){ return _pro_buf.writeAbleSize() >= len; })}// 满足条件,添加数据_pro_buf.push(data, len);// 唤醒消费者对缓冲区的数据进行处理_cond_con.notify_one();}private:void threadEntry() // 线程入口函数{while (_stop){// 判断生产缓冲区是否有数据,有则交换,无则阻塞{std::unique_lock<std::mutex> lock(_mutex);_cond_con.wait(lock, [&](){ return _stop || !_pro_buf.empty(); });_con_buf.swap(_pro_buf);}// 处理消费缓冲区_callBack(_con_buf);// 初始化消费缓冲区_con_buf.reset();// 唤醒生产者if (_looper_type == AsyncType::ASYNC_SAFE){_cond_pro.notify_all();}}}private:std::atomic<bool> _stop; // 停止标志Buffer _pro_buf; // 生产缓冲区Buffer _con_buf; // 消费缓冲区std::mutex _mutex;std::condition_variable _cond_pro;std::condition_variable _cond_con;std::thread _thread; // 异步工作器的线程AsyncType _looper_type;Functor _callBack; // 回调函数};
}