欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > Redis Pub/Sub模式:分布式系统中的解耦利器

Redis Pub/Sub模式:分布式系统中的解耦利器

2025/5/7 3:54:35 来源:https://blog.csdn.net/yqq962464/article/details/141892137  浏览:    关键词:Redis Pub/Sub模式:分布式系统中的解耦利器

序言

Redis的发布订阅(Pub/Sub)是一种消息通信模式,允许发布者(Publisher)发送消息到频道(Channel),而订阅者(Subscriber)可以订阅一个或多个频道来接收消息。

Redis 的发布订阅功能命令:PUBLISHSUBSCRIBEPSUBSIRIBE 等。

基础使用

频道订阅

执行SUBSCRIBE命令,客户端可以订阅一个或多个频道,从而成为这些频道的订阅者,当其他客户端向被订阅的频道发送消息时,频道的所有订阅者都会受到该消息。
在这里插入图片描述
假设客户端 A 和客户端 B 都执行了SUBSCRIBE news.it,那么他们就是频道news.it的订阅者。

消息发布

假设客户端 C执行 PUBLISH news.it hello,向频道news.it发送消息,那么客户端 A 和客户端 B 都会收到消息。
在这里插入图片描述

频道退订

当客户端 B 执行UNSUBSCRIBE news.it命令,则它与频道之间的订阅关系就被解除了,当该频道再有消息时,客户端 B 就收不到消息了。
在这里插入图片描述

实现原理

SUBSCRIBE

当客户端执行 SUBSCRIBE 命令订阅某个频道的时候,则客户端就与被订阅的频道之间建立了一种订阅关系。
Redis 是将所有频道的订阅关系都保存在服务器状态的pubsub_channels 字典里。

  • 键:被订阅频道的名称;
  • 值:一个链表,记录了所有订阅这个频道的客户端。
    在这里插入图片描述
    比如上图字典就记录了
  • client-1、client-2、client-3 订阅了 news.it 频道;
  • client-4 订阅了 news.sport 频道;
  • client-5、client-6 订阅了 news.business 频道。

那么在内部,这些订阅关系是怎么维护的呢?
根据频道是否有其他订阅者,分为两种情况:

  1. 如果频道有其他订阅者,那么它在 pubsub_channel 字典中必然有相对应的订阅者链表,那么就将新客户端添加到链表的尾部;
  2. 如果频道没有订阅者,那么它在 pubsub_channel 字典中就不存在,首先要在 pubsub_channel 字典中为该频道创建一个键,并将这个键的值设置为空链表,然后将客户端添加到链表的头节点。

伪代码:

public class SubscribeDemo {private static Map<String, LinkedList> map = new HashMap<>();public static void main(String[] args) {subscribe(Arrays.asList("news.it", "news.support"), "c1");subscribe(Arrays.asList("news.it"), "c2");map.forEach((k,v) -> {System.out.println("频道:" + k +" 订阅者:" +v);});}private static void subscribe(List<String> channels, String client) {for (String channel : channels) {if (map.containsKey(channel)) {LinkedList linkedList = map.get(channel);linkedList.addLast(client);} else {LinkedList<Object> node = new LinkedList<>();node.addFirst(client);map.put(channel, node);}}}
}

在这里插入图片描述

UNSUBSCRIBE

当客户端退订频道的时候,服务器将从pubsub_channels中解除客户端与被退订频道之间的关联。

  1. 根据被退订频道的名字,从字典中找到对应的订阅者链表,从链表中删除退订客户端的信息;
  2. 删除退订客户端后,如果频道的订阅者链表为空,则说明该频道已经没有任何订阅者了,则从字典中删除该频道信息。

比如此时客户端 4 执行 UNSUBSCRIBE news.sport
在这里插入图片描述
那么执行之后的字典信息是
在这里插入图片描述
伪代码:

private static void unsubscribe(List<String> channels, String client) {for (String channel : channels) {System.out.println(client + " 退定频道:" + channel);if (map.containsKey(channel)) {LinkedList subscribers = map.get(channel); // 该频道的订阅者列表int index = subscribers.indexOf(client); // 查找退订的客户端subscribers.remove(index); // 从链表中删除if (subscribers.isEmpty()) {//如果该频道的订阅者为空,则从字典中删除map.remove(channel);}}}}

PUBLISH

当客户端执行PUBLISH <channel> <message> 命令将消息发送给频道channel 的时候,则服务器需要执行以下操作:

  1. 将消息message发送给channel 频道的所有订阅者;
private static void publish(String channel, String message) {if (!map.containsKey(channel)) {return;}LinkedList nodes = map.get(channel);for (Object client : nodes) {System.out.println("给客户端:" + client + " 发送消息:" + message);}}

完整伪代码

public class SubscribeDemo {private static Map<String, LinkedList> map = new HashMap<>();public static void main(String[] args) {subscribe(Arrays.asList("news.it", "news.support"), "c1");subscribe(Arrays.asList("news.it"), "c2");map.forEach((k, v) -> {System.out.println("频道:" + k + " 订阅者:" + v);});System.out.println("-------------------");unsubscribe(Arrays.asList("news.support"), "c1");map.forEach((k, v) -> {System.out.println("频道:" + k + " 订阅者:" + v);});System.out.println("-------------------");publish("news.it", "测试消息");}private static void subscribe(List<String> channels, String client) {for (String channel : channels) {if (map.containsKey(channel)) {//如果订阅的频道在字典中,则将新的客户端添加到链表的尾部LinkedList linkedList = map.get(channel);linkedList.addLast(client);} else {//如果订阅的频道在字典中不存在,则将新的客户端添加的链表的头部LinkedList<Object> node = new LinkedList<>();node.addFirst(client);map.put(channel, node);}}}private static void unsubscribe(List<String> channels, String client) {for (String channel : channels) {System.out.println(client + " 退定频道:" + channel);if (map.containsKey(channel)) {LinkedList subscribers = map.get(channel); // 该频道的订阅者列表int index = subscribers.indexOf(client); // 查找退订的客户端subscribers.remove(index); // 从链表中删除if (subscribers.isEmpty()) {//如果该频道的订阅者为空,则从字典中删除map.remove(channel);}}}}private static void publish(String channel, String message) {//如果频道不在字典中,返回if (!map.containsKey(channel)) { return;}LinkedList nodes = map.get(channel);//遍历频道的订阅者列表,并发送消息for (Object client : nodes) {System.out.println("给客户端:" + client + " 发送消息:" + message);}}
}

在这里插入图片描述

使用场景

  • 实时消息系统:如聊天应用、新闻更新推送;
  • 事件通知:如用户行为触发的通知,订单状态变更通知;
  • 分布式系统中的数据同步:如数据库的主从复制状态同步。

优缺点

优点

  • 简单易用:通过简单的命令即可实现发布和订阅功能;
  • 低延迟:消息传递速度快,适用于需要快速响应的场景;
  • 可扩展性:可以轻松地添加更多的订阅者。

缺点

  • 消息无持久化:Redis不会存储发布的消息,如果订阅者不在线,将错过消息;
  • 资源消耗:每个订阅者都需要维护一个与Redis服务器的连接,可能会导致资源消耗;
  • 缺乏高级功能:与专业的分布式消息队列系统相比,缺乏消息确认、持久化、事务、死信队列等高级功能。

总结

Redis的发布订阅模式是一种轻量级的消息传递机制,适用于需要快速、简单消息传递的场景。然而,对于需要高可靠性、高吞吐量和复杂消息处理能力的场景,可能需要考虑使用专业的分布式消息队列系统。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词