欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > C++高性能编程:使用 Nanomsg 实现基于共享内存的发布-订阅模型

C++高性能编程:使用 Nanomsg 实现基于共享内存的发布-订阅模型

2025/9/19 14:26:31 来源:https://blog.csdn.net/stallion5632/article/details/140966467  浏览:    关键词:C++高性能编程:使用 Nanomsg 实现基于共享内存的发布-订阅模型

0. 概要

Nanomsg在发送消息的时候足够高效,但是由于进程间通信是ipc管道,因此发送文件或者图片的时候实时性不够好。
文件将介绍如何基于Nanomsg框架 实现一个基于共享内存的发布-订阅模型。
本文详细代码:sub_pub_nanomsg_shm
前置阅读: C++编程:使用Nanomsg实现高效的文件和图片传输

1. 订阅者代码解析 (sub_nanomsg_shm.cc)

首先来看 sub_nanomsg_shm.cc 文件中的实现,该文件负责接收发布者发送的数据,并通过共享内存进行通信。

#include "pub_sub_common.h"double total_latency = 0.0;
ipc::route* p_shm_channel = nullptr;
std::chrono::high_resolution_clock::time_point start_time;
std::chrono::high_resolution_clock::time_point end_time;// 信号处理函数
void signal_handler(int signal) {fprintf(stdout, "Received signal %d\n", signal);if (p_shm_channel != nullptr) {p_shm_channel->disconnect();fprintf(stdout, "Disconnected from shared memory channel\n");}
}// 处理接收到的共享内存数据
void shm_consumer(ipc::route& shm_channel) {auto buffer = shm_channel.recv();const uint32_t buffer_size = buffer.size();if (buffer_size < sizeof(uint32_t)) {fprintf(stderr, "buffer_size < 4, received termination signal\n");return;}end_time = std::chrono::high_resolution_clock::now();#if ENABLE_CRC_CHECKuint8_t* buffer_header = reinterpret_cast<uint8_t*>(buffer.data());uint32_t received_length;std::memcpy(&received_length, buffer_header, sizeof(received_length));uint16_t received_crc;std::memcpy(&received_crc, buffer_header + sizeof(received_length), sizeof(received_crc));uint32_t data_length = buffer_size - sizeof(uint32_t) - sizeof(uint16_t);
#if DEBUG_ONfprintf(stdout, "buffer_size: %u, received_length: %u, data_length: %u, received_crc: %hx\n", buffer_size,received_length, data_length, received_crc);
#endifif (received_length != data_length) {
#if DEBUG_ONfprintf(stdout, "data length exceeds buffer size, expected %u, got %u\n", received_length, data_length);
#endifreturn;}uint16_t calculated_crc =calculate_crc2(reinterpret_cast<char*>(buffer_header + sizeof(uint32_t) + sizeof(uint16_t)), received_length);if (calculated_crc != received_crc) {fprintf(stdout, "CRC mismatch: expected %hx, got %hx\n", received_crc, calculated_crc);return;}
#if DEBUG_ONfprintf(stdout, "received data of size %u bytes with valid CRC\n", received_length);
#endif
#endif
}// 运行 Nanomsg 订阅者
void nanomsg_subscriber(std::vector<std::chrono::duration<double, std::micro>>& latencies) {ipc::route shm_channel(kIpcRouteName, ipc::receiver);if (!shm_channel.valid()) {fprintf(stderr, "channel %s is not valid.\n", kIpcRouteName);return;} else {p_shm_channel = &shm_channel;}setup_signal_handlers();int sock = nn_socket(AF_SP, NN_SUB);if (sock < 0) {fprintf(stderr, "Error creating subscriber socket: %s\n", nn_strerror(nn_errno()));return;}if (nn_connect(sock, URL) < 0) {fprintf(stderr, "Error connecting subscriber socket: %s\n", nn_strerror(nn_errno()));nn_close(sock);return;}nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &RECEIVE_TIMEOUT, sizeof(RECEIVE_TIMEOUT));fprintf(stdout, "Subscriber socket connected to %s\n", URL);char* buf = nullptr;nn_pollfd pfd[1];pfd[0].fd = sock;pfd[0].events = NN_POLLIN;uint32_t recv_count{0};fprintf(stdout, "@@ Start to recv, current time: %s\n", get_currenttime_millis().c_str());start_time = std::chrono::high_resolution_clock::now();while (recv_count < NUM_MESSAGES) {int rc = nn_poll(pfd, 1, POLL_TIMEOUT);if (rc < 0) {fprintf(stderr, "Error polling socket: %s\n", nn_strerror(nn_errno()));return;} else if (rc > 0) {if ((pfd[0].revents & NN_POLLIN) != 0U) {int bytes = nn_recv(sock, &buf, NN_MSG, 0);shm_consumer(shm_channel);if (bytes < 0) {fprintf(stderr, "Error receiving message %d: %s\n", recv_count + 1, nn_strerror(nn_errno()));if (buf) {nn_freemsg(buf);}return;}nn_freemsg(buf);recv_count += 1;}} else {
#if DEBUG_ONfprintf(stdout, "nn_poll timeout %d(ms)\n", POLL_TIMEOUT);
#endif}}end_time = std::chrono::high_resolution_clock::now();fprintf(stdout, "@@ Finish recv, current time: %s\n", get_currenttime_millis().c_str());nn_close(sock);
}int main() {std::vector<std::chrono::duration<double, std::micro>> latencies;nanomsg_subscriber(latencies);auto total_latency = end_time - start_time;auto avg_latency = total_latency / NUM_MESSAGES;fprintf(stdout, "Subscriber receive %d messages.\nAverage latency: %lf milliseconds\n", NUM_MESSAGES,std::chrono::duration<double, std::milli>(avg_latency).count());return 0;
}

1.1 代码解析

Start
Initialize Publisher Socket
Bind Publisher Socket
Check Subscriber Ready
Generate Data with CRC
Send Data Using nn_send
End
  • 信号处理函数:用于处理进程信号,确保在接收到信号时正确断开与共享内存的连接。
  • 数据接收和验证:通过共享内存接收数据,并进行CRC校验,确保数据完整性。
  • Nanomsg 订阅者:初始化订阅者套接字,连接到指定URL,接收消息并调用 shm_consumer 处理接收到的数据。

2. 发布者代码解析 (pub_nanomsg.cc)

接下来是 pub_nanomsg.cc 文件的实现,该文件负责生成并发送数据给订阅者。

#include "pub_sub_common.h"bool is_subcriber_ready() {try {FileLock lock;return lock.check();} catch (const std::exception& e) {std::cerr << "Error: " << e.what() << std::endl;return false;}
}void publisher() {int sock = nn_socket(AF_SP, NN_PUB);if (sock < 0) {fprintf(stderr, "Error creating publisher socket: %s\n", nn_strerror(nn_errno()));return;}if (nn_bind(sock, URL) < 0) {fprintf(stderr, "Error binding publisher socket: %s\n", nn_strerror(nn_errno()));nn_close(sock);return;}int read_buf_len{0};size_t sz = sizeof(read_buf_len);nn_getsockopt(sock, NN_SOL_SOCKET, NN_SNDBUF, &read_buf_len, &sz);fprintf(stdout, "### Default NN_SNDBUF size %d.\n", read_buf_len);#if ENABLE_CHANGE_BUFSIZE// 设置发送缓冲区大小nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDBUF, &BUFFER_SIZE, sizeof(BUFFER_SIZE));nn_getsockopt(sock, NN_SOL_SOCKET, NN_SNDBUF, &read_buf_len, &sz);fprintf(stdout, "### After set NN_SNDBUF size %d.\n", read_buf_len);
#endiffprintf(stdout, "Publisher socket bound to %s\n", URL);while (!is_subcriber_ready()) {std::this_thread::sleep_for(std::chrono::milliseconds(100));}std::vector<uint8_t> large_data;uint32_t data_size = create_large_data(large_data);uint16_t crc = calculate_crc({large_data.begin() + sizeof(uint32_t) + sizeof(uint16_t), large_data.end()});std::memcpy(large_data.data(), &data_size, sizeof(data_size));std::memcpy(large_data.data() + sizeof(data_size), &crc, sizeof(crc));fprintf(stdout, "@@ Start to send, current time: %s\n", get_currenttime_millis().c_str());for (int i = 0; i < NUM_MESSAGES; ++i) {
#if DEBUG_ONfprintf(stdout, "Publisher prepared message %d of size %zu bytes\n", i + 1, large_data.size());
#endifint bytes = nn_send(sock, large_data.data(), large_data.size(), 0);if (bytes < 0) {fprintf(stderr, "Error sending message: %s\n", nn_strerror(nn_errno()));nn_close(sock);return;}
#if DEBUG_ONfprintf(stdout, "Sent message %d of size %zu bytes\n", i + 1, large_data.size());
#endifstd::this_thread::sleep_for(std::chrono::microseconds(THROTTLE_MESSAGE_SLEEP));  // 控制发送频率}fprintf(stdout, "@@ Finish send, current time: %s\n", get_currenttime_millis().c_str());std::this_thread::sleep_for(std::chrono::seconds(4));nn_close(sock);
}int main() {publisher();fprintf(stdout, "Publisher finish send %d messages\n", NUM_MESSAGES);return 0;
}

2.1 代码解析

highlight
Start
Start Subscriber Socket
Connect Subscriber Socket
Poll for Data Using nn_poll
nn_poll
Receive Data Using nn_recv
Process Data and Validate CRC
End
  • 检查订阅者状态:使用文件锁机制检查订阅者是否已准备好接收数据。
  • 初始化发布者套接字:创建并绑定发布者套接字,设置发送缓冲区大小。
  • 数据生成和发送:生成包含 CRC 校验的数据包,并发送给订阅者,同时控制发送频率。

3. 执行结果

发送端

test@t:~/sub_pub_nanomsg_shm/build$ ./pub_nanomsg_shm 
channel recv_count is 1.
### Default NN_SNDBUF size 131072.
Publisher socket bound to ipc:///tmp/pub_sub_nanomsg.ipc
@@ Start to send, current time: 2024-08-06 13:09:57.704
@@ Finish send, current time: 2024-08-06 13:10:02.831
Publisher finish send 10000 messages

接收端

test@t:~/sub_pub_nanomsg_shm/build$ ./sub_nanomsg_shm 
Subscriber socket connected to ipc:///tmp/pub_sub_nanomsg.ipc
@@ Start to recv, current time: 2024-08-06 13:09:56.686
@@ Finish recv, current time: 2024-08-06 13:10:02.831
Subscriber receive 10000 messages.
Average latency: 0.614469 milliseconds

4. 对比直接使用nanomsg 发送文件

C++编程:使用Nanomsg实现高效的文件和图片传输中提到直接使用 Nanomsg 发送消息的结果的平均延时为3.096960 milliseconds

  • 发送时间

    • 使用共享内存:大约 5.127 秒(13:09:57.704 到 13:10:02.831)
    • 直接使用 Nanomsg:大约 30.901 秒(13:56:10.115 到 13:56:41.016)

    使用共享内存的发送时间显著低于直接使用 Nanomsg,这表明共享内存的方式更高效。

  • 接收时间

    • 使用共享内存:大约 6.145 秒(13:09:56.686 到 13:10:02.831)
    • 直接使用 Nanomsg:大约 30.970 秒(13:56:10.044 到 13:56:41.014)

    使用共享内存的接收时间同样显著低于直接使用 Nanomsg。

  • 平均延迟

    • 使用共享内存:0.614469 毫秒
    • 直接使用 Nanomsg:3.096960 毫秒

    使用共享内存的平均延迟显著低于直接使用 Nanomsg,表明共享内存方式不仅在传输速度上有优势,而且在延迟上也表现更好。

  • 缓冲区大小

    • 使用共享内存
      • 发送端默认缓冲区大小为 131072 字节
    • 直接使用 Nanomsg
      • 发送端设置缓冲区大小为 16777216 字节
      • 接收端设置缓冲区大小为 16777216 字节

    尽管直接使用 Nanomsg 时设置了更大的缓冲区,但由于发送和接收过程中数据的处理机制不同,仍然表现出较高的延迟和较慢的速度。

5. 总结

通过对比可以看出,使用共享内存的方式在传输效率和平均延迟方面都优于直接使用 Nanomsg 的方式。这主要是因为共享内存能够更直接地进行数据交换,减少了传输过程中的开销和延迟。因此,在需要高效大规模数据传输的场景中,使用本文提到的方案会是一种更优的选择。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词