欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > 高并发场景下的数据一致性问题

高并发场景下的数据一致性问题

2025/5/15 12:43:11 来源:https://blog.csdn.net/qq_57714322/article/details/147958975  浏览:    关键词:高并发场景下的数据一致性问题

比如我们有一个电商系统,我们用户有积分,只要下单就扣减积分,取消订单和退货都返回积分

如果不加锁,我们会遇到什么问题呢?

1. 超卖问题(积分透支)
假设用户当前有 100 积分:

用户 A 同时发起两个订单请求,每个订单都需要 80 积分。
因为没有加锁,两个线程都读取到积分余额为 100,各自判断“足够”并进行扣减。
最终导致扣减了 160 积分,用户余额变成 -60。
这就是经典的并发写入导致的数据不一致问题。

2. 丢失更新积分

在同一时间对数据库中的同一余额值进行修改,可能会遇到并发问题,最典型的包括“丢失更新”问题和“脏读”现象。可能会出现下面的情况

丢失更新(Lost Update):
当两个事务同时读取相同的初始余额值,并基于该值计算新的余额后尝试更新时,最后一个执行的更新会覆盖前一个事务所做的更新,导致前一个事务的更新仿佛被“丢失”了。例如,如果A和B两个用户同时读取到账户余额为100元,A操作增加50元,B操作减少30元,如果没有适当的并发控制,最终余额可能是120元或70元,而不是正确的150元或170元减去30元后的结果。
脏读(Dirty Read):
一个事务能够读取到另一个尚未提交事务的数据。假设事务A修改了一个余额值但还未提交,事务B在此期间读取了这个未提交的数据。如果事务A最终回滚了其更改,那么事务B读取的数据就是不准确的,即所谓的“脏数据”。

乐观锁:

概述:乐观锁是一种并发控制策略,它假设多个事务不会发生冲突,在执行操作时不加锁,非常乐观,只需每次提交时利用标识进行对比,确认其他事务没修改过即可提交。

实现:

使用数据版本(Version)记录机制实现,这是乐观锁最常用的一种实现方式。即为数据增加一个版本标识,一般是给数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新,否则认为是过期数据。     

func (c customUserCardModel) UpdateAllBalance(ctx context.Context, dId int64, saleType int64, balance int64) error {// 查询当前余额和版本号query := "SELECT balance, version FROM distributor WHERE id = ?"res :=new(Version)err := c.conn.QueryRowCtx(ctx, &res, query, dId)if err != nil {return err}// 计算新的余额newBalance := res.Balance - balance// 尝试更新余额及版本号updateQuery := "UPDATE distributor SET balance = ?, version = version + 1 WHERE id = ? AND version = ?"result, err := c.conn.ExecCtx(ctx, updateQuery, newBalance, dId, res.Version)if err != nil {return err}rowsAffected, err := result.RowsAffected()if err != nil {return err}if rowsAffected == 0 {// 如果没有受影响的行,说明版本号已经改变,存在并发冲突   //1.打印日志,//2.返回错误return errors.New("并发更新失败,请重试")}return nil
}

     

重试机制:

通过乐观锁的重试机制,在保证数据一致性的前提下,可以解决由于版本冲突导致的放弃更新问题。

乐观锁冲突重试机制,重试3次:

参考这篇文章,也详细了:跳转

适用场景:乐观锁适合并发冲突少,读多写少的场景,不用通过加锁只需通过比较字段版本号(或时间戳)是否发生改变的形式,无锁操作,吞吐量较高。

悲观锁:

概述:
悲观锁认为每次操作都会发生冲突,非常悲观。它会在任何可能发生冲突的地方进行加锁,其他操作想修改都需要等它执行完后释放锁,再通过争抢到锁而进行操作。

实现:
使用悲观锁来解决并发冲突的问题,可以在查询库存时使用SELECT ... FOR UPDATE语句来获取悲观锁。这样可以确保在事务中对查询结果加锁,避免其他事务对查询结果进行修改。

例如:

func (c customUserCardModel) UpdateAllBalance(dId int64) error {
//适用于对金额敏感的系统	
distributorQuery := `update distributor set balance = balance - ? where id = ?`err := c.conn.TransactCtx(context.Background(), func(ctx context.Context, session sqlx.Session) error {query := "SELECT balance FROM distributor WHERE id = ? FOR UPDATE"  //悲观锁var balance int64err := session.QueryRowCtx(ctx, &balance, query, dId)if err != nil {return err}// 然后再执行 update_, err = session.ExecCtx(ctx, distributorQuery, balance, dId)return nil})return err
}

使用场景

悲观锁适合并发冲突多,写多读少的场景。通过每次加锁的形式来确保数据的安全性,吞吐量较低.

java自带的锁:

在单体应用中,SynchronizedReentrantLock 都可以用来保证数据的一致性。它们通过控制对共享资源的访问来防止并发冲突,确保多个线程同时操作共享资源时不会导致数据不一致的问题。

  • synchronized

  • ReentrantLock

// 使用ReentrantLock锁解决单体应用的并发问题
Lock lock = new ReentrantLock();@Autowired
StringRedisTemplate template;@RequestMapping("/buy2")
public String index() {lock.lock();try {String result = template.opsForValue().get("goods:001");int total = result == null ? 0 : Integer.parseInt(result);if (total > 0) {int realTotal = total - 1;template.opsForValue().set("goods:001", String.valueOf(realTotal));System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001");return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001";} else {System.out.println("购买商品失败,服务端口为8001");}} catch (Exception e) {lock.unlock();} finally {lock.unlock();}return "购买商品失败,服务端口为8001";
}

分布式锁:

如果是分布式系统,单体应用的锁就不行了,原因如下:

Synchronized 和 ReentrantLock 主要是在单一JVM内部工作的,这意味着它们无法直接应用于跨多个JVM或服务器节点的场景。以下是几个主要原因:

进程隔离:分布式系统由分布在不同机器上的多个进程组成,每个进程都有自己的内存空间。因此,基于JVM内部的锁机制不能跨越这些边界来协调资源访问。
网络分区和延迟:在网络环境中,消息传递可能会有延迟或者丢失,这使得实现分布式锁更加困难。简单的锁机制无法处理这种不确定性。
容错性要求:在分布式环境中,必须考虑到节点可能失败的情况。分布式锁解决方案需要具备一定的容错能力,比如能够在某个节点失效后仍然能够保持系统的可用性和数据的一致性。
一致性模型:分布式系统通常采用更复杂的共识算法(如Paxos或Raft)来达成一致,而不是简单的锁机制。这是因为简单锁机制难以满足分布式环境下对于高可用性和最终一致性或强一致性的需求。

导入依赖:

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>

配置客户端:

@Configuration
public class RedissonConfig {
​@Beanpublic RedissonClient redissonClient(){// 配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.xxx.xxx:6379").setPassword("xxxxx");  //用自己的redis连接地址// 创建RedissonClient对象return Redisson.create(config);}
}

使用:

@Resource
private RedissionClient redissonClient;
​
@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务");          }finally{//释放锁lock.unlock();}}}
@Resource
private RedissonClient redissonClient;
​
@Override
public Result seckillVoucher(Long voucherId) {// 1.查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀尚未开始!");}// 3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 尚未开始return Result.fail("秒杀已经结束!");}// 4.判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁对象boolean isLock = lock.tryLock();//加锁失败if (!isLock) {return Result.fail("不允许重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {//释放锁lock.unlock();}}

总结:

单机部署、小并发    数据库悲观锁(简单可靠)
分布式系统、高并发    Redis 分布式锁 + 降级策略
对一致性要求不极端、可接受重试 ,   乐观锁

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词