欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > windows下命名管道双端通信

windows下命名管道双端通信

2025/5/16 0:11:34 来源:https://blog.csdn.net/Wite_Chen/article/details/147121542  浏览:    关键词:windows下命名管道双端通信

实现功能

1、命名管道双端通信(异步)
2、客户端断线重连
PS:多线程版本

PipeWrapper.h

#include <windows.h>
#include <string>
#include <vector>
#include "Utils/ThreadObject.h"
#include "Utils/ThreadPool.h"// pipe server
class AsyncPipeServer
{const int kPipeTimeout = 5000;static const int kBufferSize = 4096;
public:using CallbackPipeClientMsg = std::function<void(const std::string&, HANDLE)>;AsyncPipeServer(const std::string& strName, int nMaxClients = 10): m_strPipeName("\\\\.\\pipe\\" + strName),m_nMaxClients(nMaxClients),m_bRunning(false){m_poolHandleClient.Start(1, m_nMaxClients);}~AsyncPipeServer() { Stop(); }bool Start(){if (m_bRunning) return true;m_bRunning.store(true);m_thWorker.StartOnce(std::bind(&AsyncPipeServer::loop, this));return true;}void Stop(){m_bRunning.store(false);m_thWorker.Stop();std::lock_guard<std::mutex> lock(m_mtClient);for (auto& client : m_listClients) {disconnect_and_close(client);}m_listClients.clear();}void SetMessageHandler(CallbackPipeClientMsg cbMsg){m_funcMsgHandler = cbMsg;}bool Send(HANDLE hPipe, const std::string& strMsg){if (!hPipe || INVALID_HANDLE_VALUE == hPipe) return false;OVERLAPPED ov = { 0 };DWORD written;bool bSucc = WriteFile(hPipe,strMsg.c_str(),static_cast<DWORD>(strMsg.size()),&written,&ov);if (!bSucc && GetLastError() == ERROR_IO_PENDING) {WaitForSingleObject(ov.hEvent, INFINITE);return true;}return bSucc;}void Broadcast(const std::string& message){std::lock_guard<std::mutex> lock(m_mtClient);for (auto& client : m_listClients) {Send(client, message);}}private:void loop(){while (m_bRunning) {HANDLE hPipe = CreateNamedPipeA(m_strPipeName.c_str(),PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,m_nMaxClients,kBufferSize,kBufferSize,kPipeTimeout,NULL);if (hPipe == INVALID_HANDLE_VALUE) break;OVERLAPPED ov = { 0 };ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);if (!ConnectNamedPipe(hPipe, &ov)) {if (GetLastError() != ERROR_IO_PENDING) {CloseHandle(hPipe);continue;}}DWORD wait = WaitForSingleObject(ov.hEvent, INFINITE);if (wait == WAIT_OBJECT_0) {DWORD bytesTransferred;GetOverlappedResult(hPipe, &ov, &bytesTransferred, FALSE);{std::lock_guard<std::mutex> lock(m_mtClient);m_listClients.push_back(hPipe);}m_poolHandleClient.Commit(std::bind(&AsyncPipeServer::handle_client_message, this, hPipe));}CloseHandle(ov.hEvent);}}void handle_client_message(HANDLE hPipe){char szMsg[kBufferSize];DWORD nReadByte = 0;while (m_bRunning) {OVERLAPPED ov = { 0 };ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);if (!ReadFile(hPipe, szMsg, kBufferSize, &nReadByte, &ov)) {if (GetLastError() != ERROR_IO_PENDING) break;DWORD wait = WaitForSingleObject(ov.hEvent, INFINITE);if (wait != WAIT_OBJECT_0) break;GetOverlappedResult(hPipe, &ov, &nReadByte, FALSE);}if (0 == nReadByte) {std::cout << "Client disconnected\n";}else {if(m_funcMsgHandler)m_funcMsgHandler(std::string(szMsg, nReadByte), hPipe);}CloseHandle(ov.hEvent);}disconnect_and_close(hPipe);}void disconnect_and_close(HANDLE hPipe){if (hPipe != INVALID_HANDLE_VALUE) {DisconnectNamedPipe(hPipe);CloseHandle(hPipe);}std::lock_guard<std::mutex> lock(m_mtClient);m_listClients.erase(std::remove(m_listClients.begin(), m_listClients.end(), hPipe), m_listClients.end());}private:std::string m_strPipeName;int m_nMaxClients;std::atomic<bool> m_bRunning;Utils::ThreadObject m_thWorker;std::vector<HANDLE> m_listClients;std::mutex m_mtClient;CallbackPipeClientMsg m_funcMsgHandler;Utils::ThreadPool m_poolHandleClient;
};// pipe client
class AsyncPipeClient
{static const int kBufferSize = 4096;
public:using CallbackPipeServerMsg = std::function<void(const std::string&)>;AsyncPipeClient(const std::string& strServer, const std::string& strName): m_strPipeName("\\\\" + strServer + "\\pipe\\" + strName),m_bConnect(false){}~AsyncPipeClient() { Disconnect(); }bool Connect(int nTimeout = 30000){m_nWaitTimeout = nTimeout;if (m_bConnect) return true;m_objReconnect.Start(std::bind(&AsyncPipeClient::reconnect, this), 10 * 1000);return true;}void Disconnect(){m_bConnect = false;close_pipe();m_objReconnect.Stop();m_objReadMsg.Stop();}bool Send(const std::string& strMsg){if (!m_bConnect) return false;OVERLAPPED ov = { 0 };DWORD written;bool bSucc = WriteFile(m_hPipe,strMsg.c_str(),static_cast<DWORD>(strMsg.size()),&written,&ov);if (!bSucc && GetLastError() == ERROR_IO_PENDING){WaitForSingleObject(ov.hEvent, INFINITE);return true;}return bSucc;}void SetMessageHandler(CallbackPipeServerMsg cbMsg){m_funcMsgHandler = cbMsg;}private:void read_msg(){char szMsg[kBufferSize] = { 0 };DWORD nReadBytes = 0;while (m_bConnect.load()) {OVERLAPPED ov = { 0 };ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);if (!ReadFile(m_hPipe, szMsg, kBufferSize, &nReadBytes, &ov)) {if (GetLastError() != ERROR_IO_PENDING) break;DWORD wait = WaitForSingleObject(ov.hEvent, INFINITE);if (wait != WAIT_OBJECT_0) break;GetOverlappedResult(m_hPipe, &ov, &nReadBytes, FALSE);}if (0 == nReadBytes) {m_bConnect = false;std::cout << "Server disconnected\n";}else {if (m_funcMsgHandler)m_funcMsgHandler(std::string(szMsg, nReadBytes));}CloseHandle(ov.hEvent);}}void reconnect(){if (m_bConnect.load()) return;close_pipe();if (WaitNamedPipeA(m_strPipeName.c_str(), m_nWaitTimeout)) {m_hPipe = CreateFileA(m_strPipeName.c_str(),GENERIC_READ | GENERIC_WRITE,0,NULL,OPEN_EXISTING,FILE_FLAG_OVERLAPPED,NULL);if (m_hPipe != INVALID_HANDLE_VALUE) {DWORD mode = PIPE_READMODE_MESSAGE;if (SetNamedPipeHandleState(m_hPipe, &mode, NULL, NULL)) {m_bConnect.store(true);std::cout << "Connect server...\n";m_objReadMsg.StartOnce(std::bind(&AsyncPipeClient::read_msg, this));return;}}}}void close_pipe(){if (m_hPipe != INVALID_HANDLE_VALUE) {CancelIo(m_hPipe);CloseHandle(m_hPipe);m_hPipe = INVALID_HANDLE_VALUE;}}private:std::string m_strPipeName;HANDLE m_hPipe = INVALID_HANDLE_VALUE;std::atomic<bool> m_bConnect;Utils::ThreadObject m_objReadMsg;CallbackPipeServerMsg m_funcMsgHandler;Utils::ThreadObject m_objReconnect;int m_nWaitTimeout;
};

main.cpp

#include <iostream>
#include <string>
#include "PipeWrapper.h"void startServer()
{AsyncPipeServer server("testpipe");server.SetMessageHandler([&](const std::string& msg, HANDLE client) {std::cout << "Received: " << msg << std::endl;server.Send(client, "Echo: " + msg);});server.Start();// 保持主线程运行while (true) {std::this_thread::sleep_for(std::chrono::seconds(1));}
}void startClient()
{std::string strClient = std::to_string(time(nullptr));AsyncPipeClient client(".", "testpipe");client.SetMessageHandler([](const std::string& msg) {std::cout << "Server response: " << msg << std::endl;});if (client.Connect()) {//for (int i = 0; i < 5; ++i)int nIndex = 0;for(;;){client.Send("["+ strClient + "] Hello " + std::to_string(++nIndex));std::this_thread::sleep_for(std::chrono::seconds(1));}}getchar();client.Disconnect();
}int main(int argc, char* argv[])
{
#if 1bool bServer = true;
#elsebool bServer = false;#endifif (1 == argc && bServer) {startServer();}else {startClient();}getchar();return 0;
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词