欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > 利用Redisson分布式锁解决多服务器数据刷新问题

利用Redisson分布式锁解决多服务器数据刷新问题

2025/5/3 0:53:13 来源:https://blog.csdn.net/jike11231/article/details/147643114  浏览:    关键词:利用Redisson分布式锁解决多服务器数据刷新问题

利用Redisson分布式锁解决多服务器数据刷新问题

  • 一、业务背景
  • 二、代码实现
    • 1、引入Redisson依赖
    • 2、配置Redisson,实际项目中Redis为集群配置
    • 3、自定义拒绝策略
    • 4、异步刷新网元服务
  • 三、项目结构及源码

一、业务背景

最近有个需求需要自动刷新网元服务,由于我们生产环境数据库是多台服务器,刷新网元可能导致的数据不一致问题‌,所以采用Redisson分布式锁方式实现这个业务功能。

二、代码实现

1、引入Redisson依赖

        <!-- redis --><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.7.5</version></dependency><!-- redisson --><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.16.0</version></dependency>

2、配置Redisson,实际项目中Redis为集群配置

@Configuration
public class RedisssionConfig {@Value("${spring.redis.password}")private String password;@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private String port;/*** 对 Redisson 的使用都是通过 RedissonClient 对象** @return*/@Bean(name = "redissonClient", destroyMethod = "shutdown") // 服务停止后调用 shutdown 方法public RedissonClient redissonClient() {// 1、创建配置Config config = new Config();// 2、集群模式// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001");// 根据 Config 创建出 RedissonClient 示例config.useSingleServer().setPassword(StringUtils.isEmpty(password) ? null : password).setAddress(host.contains("://") ? "" : "redis://" + host + ":" + port);return Redisson.create(config);}
}

3、自定义拒绝策略

@Component
public class RefreshNodeRejectHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 拒绝将任务保存起来if (!executor.isShutdown()) {saveRejectedTask(r);}}class TaskInfo implements Runnable {private final List<NodeVo> nodeVos;TaskInfo(List<NodeVo> nodeVos) {this.nodeVos = nodeVos;}@Overridepublic void run() {System.out.println("执行了TaskInfo的run方法,执行了某某业务逻辑");}public List<NodeVo> getNodeVos() {return nodeVos;}}private void saveRejectedTask(Runnable r) {if (r instanceof TaskInfo) {TaskInfo taskInfo = (TaskInfo) r;List<NodeVo> nodeVos = taskInfo.getNodeVos();for (NodeVo node : nodeVos) {System.out.println("保存了被拒绝的任务:" + node);}}}
}

4、异步刷新网元服务

@Service
public class AsynFleshNodeService implements ApplicationRunner {private ThreadPoolExecutor threadPoolExecutor;@Autowiredprivate RefreshNodeRejectHandler refreshNodeRejectHandler;@Autowiredprivate RedissonClient redissionClient;@Autowiredprivate NodeMapper nodeMapper;@Overridepublic void run(ApplicationArguments args) throws Exception {ThreadPoolExecutor singleThreadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(1000));singleThreadPoolExecutor.execute(this::initTask);}private void initTask() {if (!openFleshNode()) {return;}// 创建线程池,线程队列大小为10000,线程队列满时拒绝任务,拒绝策略为自定义的RefreshNodeRejectHandlerthreadPoolExecutor = new ThreadPoolExecutor(1, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000),Executors.defaultThreadFactory(), refreshNodeRejectHandler);while (executeRunningData()) {// 如果线程队列大于6000或者线程队列为空且线程数小于等于0,则线程休眠5sif (threadPoolExecutor.getQueue().size() >= 6000 || (threadPoolExecutor.getQueue().isEmpty()&& threadPoolExecutor.getActiveCount() == 0)) {try {// 线程休眠5slong sleepTimes = 5000;Thread.sleep(sleepTimes);} catch (InterruptedException e) {System.out.println("线程休眠异常");}}}}private boolean openFleshNode() {// 从数据字典获取打开开关String isOpen = "Y";return StringUtils.equals(isOpen, "Y");}private boolean paushFleshNode() {// 从数据字典获取暂停开关String isPause = "N";return StringUtils.equals(isPause, "Y");}// 处理running数据,设置为true代表一直执行的方法protected boolean executeRunningData() {while (!paushFleshNode() && CollectionUtils.isNotEmpty(nodeMapper.getNodeList())) {// 这里锁名建议定义枚举类RLock lock = redissionClient.getLock("fleshNode");List<NodeVo> nodeVoList = new ArrayList<>();try {// 设置等待时间10s,锁过期时间60sif (!lock.tryLock(10, 60, TimeUnit.SECONDS)) {continue;}System.out.println("执行刷新网元逻辑,写入数据库");} catch (InterruptedException e) {System.out.println("获取锁失败");} finally {// 释放锁if (lock.isLocked() && lock.isHeldByCurrentThread()) {lock.unlock();}}for (NodeVo nodeVo : nodeVoList) {threadPoolExecutor.execute(() -> { System.out.println("执行网元其它操作业务,写入数据库:" + nodeVo); });}}return openFleshNode();}
}

三、项目结构及源码

在这里插入图片描述
源码下载,欢迎Star: demo-springboot-mybatisplus

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词