Redis 解决卖超问题

语言: CN / TW / HK

本文从Redis出发,搭建模拟高并发场景并实现超卖情况,再从redis的角度解决这个问题。场景也是从单机环境到集群环境,模拟此操作,需要jmeter工具和单个redis服务(因为是模拟,只是为了测线程安全,无需集群)

前言:

需要有可用的 Redis 及 SpringBoot 整合 Redis 的基础操作,jmeter 工具 (jmeter下载地址:https://jmeter.apache.org/download_jmeter.cgi)

浪费别人的时间就是在谋财害命

文章涉及源码:Gitee地址

https://gitee.com/Array_Xiang/spring-boot-redis.git

  • 先写一段简单的代码
    • 解决方案一:设置 synchronzied
    • 解决方案二:Redis setnx实现分布式锁
    • 解决方案三:Redisson API
    • 解决方案四:RedLock 高可用并发锁

先写一段简单的代码

两个接口,创建一个stock 商品设置200个库存

另一个接口,获取 redis 的库存数,判断是否有库存,如果有,就取出来-1再放回去。

``` package com.liuyuncen.shop.controller;

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;

/*  * @belongsProject: radis_springboot  * @belongsPackage: com.liuyuncen.shop.controller  * @author: Xiang想  * @createTime: 2022-09-05  16:59  * @description: TODO  * @version: 1.0  / @RestController public class ShopController {

@Autowired     StringRedisTemplate stringRedisTemplate;

/      * @description: 创建一个库存量为200的商品      * @author: Xiang想      * @date: 2022/9/5 5:00 PM      * @param: []      * @return: java.lang.String      /     @RequestMapping("/setStock")     public String setStock(){         stringRedisTemplate.opsForValue().set("stock",200+"");         return "ok";     }

/      * @description: 卖出商品,有库存就减1 没有库存就打印无库存日志      * @author: Xiang想      * @date: 2022/9/5 5:02 PM      * @param: []      * @return: java.lang.String      /     @RequestMapping("/deductStock")     public String deductStock(){         int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));         if (stock>0){             int realStock = stock - 1;             stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));             System.out.println("商品扣减成功,剩余商品:"+realStock);         }else {             System.out.println("库存不足....");         }         return "end";     } } ```

写好了,启动服务

图片

先执行第一个接口,创建一个stock 商品,然后创建 jmeter 测试案例

图片

图片

如果不知道怎么用,看这个链接:https://blog.csdn.net/bin0503/article/details/123543484。创建完成后,点击上面的绿色箭头

图片

看到这里,大家就知道发生了啥了,一个库存被卖了七八次,肯定是卖超了,但是有问题了,redis 是单线程的呀,为啥会卖超呢?


解决方案一:设置 synchronzied

既然因为并发太高导致的问题,那肯定和线程有关,我们尝试加个锁呗

先调用第一个接口,重置库存数,使用客户端命令排查一下

图片

添加 synchronized 关键字

/**      * @description: 在卖出的服务中添加 synchronized 关键字 添加锁      * @author: Xiang想      * @date: 2022/9/5 5:02 PM      * @param: []      * @return: java.lang.String      **/     @RequestMapping("/syncDeductStock")     public String syncDeductStock(){         synchronized (this){             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));             if (stock>0){                 int realStock = stock - 1;                 stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));                 System.out.println("商品扣减成功,剩余商品:"+realStock);             }else {                 System.out.println("库存不足....");             }             return "end";         }     }

重新调用一遍卖出接口

图片

图片

发现在单机模式下,添加 synchronized  关键字,确实能够避免商品卖超的问题!

但是在微服务情况下,针对该服务设置了集群,synchronized 还能保证数据的正确性吗?假设多个服务,被注册到服务器中心,每个微服务中的处理接口都有 synchronized 关键字

我创建了三个服务,用三个不同的端口模拟集群环境下 synchronized 问题

图片

然后把 stock 库存数调整为3的倍数,300

127.0.0.1:6379[10]> set stock 300 OK

创建 jmeter 测试接口

图片

执行后,发现三个接口都调用了100次,但是库存还有 145

127.0.0.1:6379[10]> get stock "145"

图片

分别打开两个服务的日志,发现他们消费了重复的商品。


解决方案二:Redis setnx实现分布式锁

在Redis中存在一条命令setnx (set if not exists)

setnx key value

如果不存在key,则可以设置成功;否则设置失败。

我们对 controller 方法进行修改一下

``` @RequestMapping("/redisLockStock") public String redisLockStock(){   String key = "lock";   // setnx   // 由于redis是一个单线程,执行命令采取“队列”形式排队!   // 优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。   boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");   // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false   if (!result) {     // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!     return "err";   }   // 获取Redis数据库中的商品数量   Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 减库存   if (stock > 0) {     int realStock = stock - 1;     stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));     System.out.println("商品扣减成功,剩余商品:" + realStock);   } else {     System.out.println("库存不足.....");   }

// 程序执行完成,则删除这个key   stringRedisTemplate.delete(key);

return "end"; } ```

大概解释一下

setnx 这个命令是一个上锁的命令

127.0.0.1:6379[10]> exists lock (integer) 0 127.0.0.1:6379[10]> setnx lock "hi" (integer) 1 127.0.0.1:6379[10]> setnx lock "good" (integer) 0 127.0.0.1:6379[10]> get lock "hi"

通过这一段命令,我们可以看出来,setnx 设置的值,无法被修改。但是可以对其进行删除,再结合上面的代码,我们可以得到这样一个流程

图片

但就这样一块代码来说,不够严谨,一旦减少库存操作出现了异常,导致解锁无法执行,以至于其他请求一直无法拿到 key,程序逻辑死锁!

我们可以尝试用 try...finally 解决异常问题!

``` @RequestMapping("/redisLockStock") public String redisLockStock(){    // 创建一个key,保存至redis   String key = "lock";   // setnx   // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败   boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");   // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false   if (!result) {    // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!    return "err";   }      try {    // 获取Redis数据库中的商品数量    Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));    // 减库存    if (stock > 0) {     int realStock = stock - 1;     stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));     System.out.println("商品扣减成功,剩余商品:" + realStock);    } else {     System.out.println("库存不足.....");    }   } finally {    // 程序执行完成,则删除这个key    // 放置于finally中,保证即使上述逻辑出问题,也能del掉    stringRedisTemplate.delete(key);   }

return "end"; } ```

这样,就显得更严谨了。

But,但是!!!

如果有一台服务器在减库存的过程中,出现了断电、宕机等原因导致 finally 中的语句没有执行,同样出现 key 一直存在,导致死锁!

我们可以通过设置超时时间,我们可以用

stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS);

设置超时时间吗?不行!!因为可在创建锁成功的一瞬间就宕机,导致的设置时长无法执行,而死锁!所以我们要找一个原子类的操作API

stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS);

将 setIfAbsent 和 expire 合并成一条 原子命令。

好了,我们现在设置了10秒的过期时长,这样就万无一失了吗?

如果?执行减少库存的时间超过了10秒呢?而正好另一台服务器又接收到了请求,依然出现了  不存在的情况,超卖依旧没有解决。

所以我们到目前总结一下

图片


解决方案三:Redisson API

setnx 的方式会出现无法准确判断业务操作时长,而无法保证安全,设置时间太长,性能不好,设置时间短,容易出现卖超问题。

难道就没有其他办法了吗?

Redisson 和 jedis 都是 Java 实现 Redis 的客户端,但是 Redisson 比 jedis 具有更多功能

引入新的 pom 文件

<dependency>             <groupId>org.redisson</groupId>             <artifactId>redisson</artifactId>             <version>3.17.6</version>         </dependency>

创建 bean 配置类

``` package com.liuyuncen.config;

import org.redisson.Redisson; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

/*  * @belongsProject: radis_springboot  * @belongsPackage: com.liuyuncen.config  * @author: Xiang想  * @createTime: 2022-09-08  09:08  * @description: TODO  * @version: 1.0  / @Configuration public class RedissonConfig {

@Value("${spring.redis.host}")     private String redisHost;     @Value("${spring.redis.port}")     private String redisPort;     @Value("${spring.redis.password}")     private String password;     @Value("${spring.redis.database}")     private Integer dataBase;

@Bean     public Redisson createRedisson(){         Config config = new Config();         config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort).setDatabase(dataBase).setPassword(null);         return (Redisson) Redisson.create(config);     } } ```

这里要注意一下,因为我本地的 redis 没有密码,所以在 setPassword(null) 这里给了空。否则会报 ==Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379== 错误

然后就是消费的接口方法

@GetMapping("/redissonLockStock")     public String redissonLockStock() throws InterruptedException {         String key = "lock";         RLock lock = redisson.getLock(key);         try {             lock.lock();             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));             if (stock % 100 == 0){                 System.out.println("A服务 延迟10秒");                 Thread.sleep(10000);                 System.out.println("A服务 延迟10秒结束 继续后续操作");             }             if (stock>0){                 int resultStock = stock - 1;                 stringRedisTemplate.opsForValue().set("stock",String.valueOf(resultStock));                 System.out.println("A服务 商品扣除成功,剩余商品:"+resultStock);             }else {                 System.out.println("A服务 库存不足...");             }         }finally {             lock.unlock();         }         return "end";     }

这里我还特地添加了 10秒延迟,可以在这个时间段看到 lock 的值

图片

我们看到他也是通过 TTL 延时过期来实现的。那到底是咋做的呢?

我们来看一下 Redisson 源码

String key = "lock"; RLock lock = redisson.getLock(key);

org.redisson.RedissonLock.RedissonLock(CommandAsyncExecutor, String)中,我们看到 redisson 给 key 设置的属性中有超时时间

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {   super(commandExecutor, name);   this.commandExecutor = commandExecutor;   this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();   this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }

超时间数为(org.redisson.config.Config.lockWatchdogTimeout):默认30s

private long lockWatchdogTimeout = 30 * 1000;

其中加锁、续命锁在以下代码中实现

lock.lock();

查看源码(org.redisson.RedissonLock.lock()),逐个判断分析得到核心逻辑代码如下所示:

```  RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {         internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,            // 如果存在 KEYS[1],这个KEYS[1]就是最初设置的redisson.getLock(key)                   "if (redis.call('exists', KEYS[1]) == 0) then " +                       //上述代码执行逻辑为0,表示不存在                       // 不存在则将 锁key+线程id设置为hash类型数据保存redis(ARGV[2]为当前执行线程id)                       "redis.call('hset', KEYS[1], ARGV[2], 1); " +                       // 设置这个 hash数据类型 的有效时间                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +                       "return nil; " +                   "end; " +                   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                     // 如果这个 锁key 在redis中存在,返回1表示数据存在                     //hincrby 自增1                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                       // 重新设定有效时间                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +                       "return nil; " +                   "end; " +                   "return redis.call('pttl', KEYS[1]);",                     Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));     } ```

我们可以看到一打开 redis.call 命令,其实这是 lua 语言,其中的指令有

exists 存在、pexpire 设置有效时间

根据上述源码中,存在设置超时时间默认为30秒,但是我们知道,真正的业务执行过程不见得就是30秒,拿着一块 redisson 怎么处理呢?

在源码org.redisson.RedissonLock.tryAcquireAsync(long, TimeUnit, long)中,针对时间处理参数做了如下操作:

``` private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {         if (leaseTime != -1) {             return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);         }         RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);         // 设置监听线程,当异步方法tryLockInnerAsync执行完触发         ttlRemainingFuture.addListener(new FutureListener() {          // 重写 operationComplete 方法             @Override             public void operationComplete(Future future) throws Exception {                 if (!future.isSuccess()) {                     return;                 }

Long ttlRemaining = future.getNow();                 // lock acquired                 if (ttlRemaining == null) {                     // 开启定时任务                     scheduleExpirationRenewal(threadId);                 }             }         });         return ttlRemainingFuture;     } ```

查看定时任务源码(org.redisson.RedissonLock.scheduleExpirationRenewal(long)):

``` private void scheduleExpirationRenewal(final long threadId) {         if (expirationRenewalMap.containsKey(getEntryName())) {             return;         }   // 定时任务的创建         Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {             @Override             public void run(Timeout timeout) throws Exception {                 //又是一个lua脚本,重新设置锁                 RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                         "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                         // 获取redis的hash数据类型中,指定的key-线程id 信息。                         // 如果 == 1 表示存在这个锁                         // 重新设置key的失效时间                             "redis.call('pexpire', KEYS[1], ARGV[1]); " +                             "return 1; " +                         "end; " +                         "return 0;",                           Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));                                  // 设置失效时间后(evalWriteAsync执行后),开启监听                 future.addListener(new FutureListener() {                     @Override                     public void operationComplete(Future future) throws Exception {                         expirationRenewalMap.remove(getEntryName());                         // 如果future 未执行成功                         if (!future.isSuccess()) {                             log.error("Can't update lock " + getName() + " expiration", future.cause());                             return;                         }                         // future 执行完成                         if (future.getNow()) {                          // 调取自身,此时并不会造成死循环                          // 调用自身,继续执行 TimerTask中的逻辑,包括定时操作                             // reschedule itself                             scheduleExpirationRenewal(threadId);                         }                     }                 });             }            // 每 30/3 也就是10秒         }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {             task.cancel();         }     } ```

通过这个定时和设置延迟时间,我们就可以清楚的知道, redisson 是如何做延时处理的

图片

redisson 并不是等30秒都执行完了,再去续命,而是每过10秒就续10秒

我每隔一秒执行 ttl lock 发现

127.0.0.1:6379[10]> ttl lock (integer) 23 127.0.0.1:6379[10]> ttl lock (integer) 22 127.0.0.1:6379[10]> ttl lock (integer) 21 127.0.0.1:6379[10]> ttl lock (integer) 20 127.0.0.1:6379[10]> ttl lock (integer) 29 127.0.0.1:6379[10]> ttl lock (integer) 28

那我们知道了 redisson 通过上锁加续命的方式解决分布式锁。还有其他的办法吗?

解决方案四:RedLock 高可用并发锁

用之前,我先说一下大致原理,RedLock 思想为了保证高可用性,在设置key 的时候,会创建多个节点,单个节点设置成功不会告诉程序获得了锁,只有超过半数的节点设置成功,才会告诉程序锁上了

所以我们要创建多个 key

``` @GetMapping("/redLockStock")     public String redLockStock(){         // 创建多个key,         String key1 = "lock:1";         String key2 = "lock:2";         String key3 = "lock:3";         RLock lock1 = redisson.getLock(key1);         RLock lock2 = redisson.getLock(key2);         RLock lock3 = redisson.getLock(key3);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);         try {             boolean tryLock = redLock.tryLock(10, 30, TimeUnit.SECONDS);             if (tryLock){                 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));                 if (stock>0){                     int resultStock = stock - 1;                     stringRedisTemplate.opsForValue().set("stock",String.valueOf(resultStock));                     System.out.println("A服务 商品扣除成功,剩余商品:"+resultStock);                 }else {                     System.out.println("A服务 库存不足...");                 }             }         }catch (InterruptedException e){             e.printStackTrace();         }finally {             redLock.unlock();         }         return "end";     } ```

多节点的Redis实现的分布式锁 RedLock 可以有效防止单点故障。

我们再来细说他为什么可以实现这样的功能

  1. 获取当前时间戳

  2. client 尝试按顺序使用相同的 key、value 获取所有 redis 服务的锁,在获取锁的过程中,获取时间比锁过期时间短的多,这是为了不要过长时间等待已经关闭的 Redis 服务,并且试着获取下一个 Redis 实例

    比如 TTL为5秒,设置获取锁的时间最多用1秒,如果1秒都没有获取到锁,那就放弃这个锁,立刻获取下一个锁

  3. client通过获取所有能获取的时间减去第一步的时间,这个时间差小于TTL时间并且少于有3个redis实例成功获取锁,才算正在的获取锁成功

  4. 如果成功拿到锁,锁的真正有效时间是 TTL 减去第三步的时间差,假如TTL是5秒,获取锁用了2秒,真正有效的就是3秒。

  5. 如果客户端由于某些情况获取锁失败,便会开始解锁所有redis,因为可能也就获取了小于3个锁,必须释放,否则影响其他client获取锁

图片

开始时间是T1是 0:00 ,获取锁时所有 key-value 都是一样的,TTL 是5min,假设漂移时间 1min,最后结束时间是 T2是 0:02 ,所以此锁最小有效时间为:TTL-(T2-T1)-漂移时间 = 5min - (0:02 - 0:00) -1min = 2min

  • RedLock 算法是否是异步算法?

可以看成是同步算法,因为即使进程间(多个电脑间)没有同步时间,但是每个进程时间流速大致相同,并且时钟漂移相对于 TTL 较小,可以忽略,所以可以看成同步算法。

  • RedLock 失败重试

当client 不能获取锁时,应该在随机时间后重试获取锁,并且最好在同一时刻并发把set命令发给所有redis实例,而且对于已经获取锁的client在完成任务后及时释放锁

  • RedLock 释放锁

由于释放锁会判断这个锁value是不是自己设置的,如果是才删除,所以释放的时候很简单,只要向所有实例发出释放锁的命令,不用考虑是否成功释放

好,到这里,相信你对卖超问题已经能很好的解决了!

「其他文章」