ChatServer
一个TCP服务器必然会有连接的接收,维持,收发数据等逻辑。那我们就要基于asio完成这个服务的搭建。主服务是这个样子的
#include "LogicSystem.h"#include <csignal>#include <thread>#include <mutex>#include "AsioIOServicePool.h"#include "CServer.h"#include "ConfigMgr.h"using namespace std;bool bstop = false;std::condition_variable cond_quit;std::mutex mutex_quit;int main(){try {auto &cfg = ConfigMgr::Inst();auto pool = AsioIOServicePool::GetInstance();boost::asio::io_context io_context;boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&io_context, pool](auto, auto) {io_context.stop();pool->Stop();});auto port_str = cfg["SelfServer"]["Port"];CServer s(io_context, atoi(port_str.c_str()));io_context.run();}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << endl;}}
CServer类的声明
#include <boost/asio.hpp>#include "CSession.h"#include <memory.h>#include <map>#include <mutex>using namespace std;using boost::asio::ip::tcp;class CServer{public:CServer(boost::asio::io_context& io_context, short port);~CServer();void ClearSession(std::string);private:void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);void StartAccept();boost::asio::io_context &_io_context;short _port;tcp::acceptor _acceptor;std::map<std::string, shared_ptr<CSession>> _sessions;std::mutex _mutex;};
构造函数中监听对方连接
CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port),_acceptor(io_context, tcp::endpoint(tcp::v4(),port)){cout << "Server start success, listen on port : " << _port << endl;StartAccept();}
接受连接的函数
void CServer::StartAccept() {auto &io_context = AsioIOServicePool::GetInstance()->GetIOService();shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));}
AsioIOServicePool
从AsioIOServicePool中返回一个可用的iocontext构造Session,然后将接受的新链接的socket写入这个Session保管。
AsioIOServicePool已经在前面讲解很多次了,它的声明如下
#include <vector>#include <boost/asio.hpp>#include "Singleton.h"class AsioIOServicePool:public Singleton<AsioIOServicePool>{friend Singleton<AsioIOServicePool>;public:using IOService = boost::asio::io_context;using Work = boost::asio::io_context::work;using WorkPtr = std::unique_ptr<Work>;~AsioIOServicePool();AsioIOServicePool(const AsioIOServicePool&) = delete;AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;// 使用 round-robin 的方式返回一个 io_serviceboost::asio::io_context& GetIOService();void Stop();private:AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());std::vector<IOService> _ioServices;std::vector<WorkPtr> _works;std::vector<std::thread> _threads;std::size_t _nextIOService;};
AsioIOServicePool具体实现
#include "AsioIOServicePool.h"#include <iostream>using namespace std;AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),_works(size), _nextIOService(0){for (std::size_t i = 0; i < size; ++i) {_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));}//遍历多个ioservice,创建多个线程,每个线程内部启动ioservicefor (std::size_t i = 0; i < _ioServices.size(); ++i) {_threads.emplace_back([this, i]() {_ioServices[i].run();});}}AsioIOServicePool::~AsioIOServicePool() {std::cout << "AsioIOServicePool destruct" << endl;}boost::asio::io_context& AsioIOServicePool::GetIOService() {auto& service = _ioServices[_nextIOService++];if (_nextIOService == _ioServices.size()) {_nextIOService = 0;}return service;}void AsioIOServicePool::Stop(){//因为仅仅执行work.reset并不能让iocontext从run的状态中退出//当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。for (auto& work : _works) {//把服务先停止work->get_io_context().stop();work.reset();}for (auto& t : _threads) {t.join();}}
CServer的处理连接逻辑
void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){if (!error) {new_session->Start();lock_guard<mutex> lock(_mutex);_sessions.insert(ma
