Redis 解決賣超問題

語言: CN / TW / HK

本文從Redis出發,搭建模擬高併發場景並實現超賣情況,再從redis的角度解決這個問題。場景也是從單機環境到集羣環境,模擬此操作,需要jmeter工具和單個redis服務(因為是模擬,只是為了測線程安全,無需集羣)

前言:

需要有可用的 Redis 及 SpringBoot 整合 Redis 的基礎操作,jmeter 工具 (jmeter下載地址:https://jmeter.apache.org/download_jmeter.cgi)

浪費別人的時間就是在謀財害命

文章涉及源碼:Gitee地址

https://gitee.com/Array_Xiang/spring-boot-redis.git

  • 先寫一段簡單的代碼
    • 解決方案一:設置 synchronzied
    • 解決方案二:Redis setnx實現分佈式鎖
    • 解決方案三:Redisson API
    • 解決方案四:RedLock 高可用併發鎖

先寫一段簡單的代碼

兩個接口,創建一個stock 商品設置200個庫存

另一個接口,獲取 redis 的庫存數,判斷是否有庫存,如果有,就取出來-1再放回去。

``` package com.liuyuncen.shop.controller;

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;

/*  * @belongsProject: radis_springboot  * @belongsPackage: com.liuyuncen.shop.controller  * @author: Xiang想  * @createTime: 2022-09-05  16:59  * @description: TODO  * @version: 1.0  / @RestController public class ShopController {

@Autowired     StringRedisTemplate stringRedisTemplate;

/      * @description: 創建一個庫存量為200的商品      * @author: Xiang想      * @date: 2022/9/5 5:00 PM      * @param: []      * @return: java.lang.String      /     @RequestMapping("/setStock")     public String setStock(){         stringRedisTemplate.opsForValue().set("stock",200+"");         return "ok";     }

/      * @description: 賣出商品,有庫存就減1 沒有庫存就打印無庫存日誌      * @author: Xiang想      * @date: 2022/9/5 5:02 PM      * @param: []      * @return: java.lang.String      /     @RequestMapping("/deductStock")     public String deductStock(){         int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));         if (stock>0){             int realStock = stock - 1;             stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));             System.out.println("商品扣減成功,剩餘商品:"+realStock);         }else {             System.out.println("庫存不足....");         }         return "end";     } } ```

寫好了,啟動服務

圖片

先執行第一個接口,創建一個stock 商品,然後創建 jmeter 測試案例

圖片

圖片

如果不知道怎麼用,看這個鏈接:https://blog.csdn.net/bin0503/article/details/123543484。創建完成後,點擊上面的綠色箭頭

圖片

看到這裏,大家就知道發生了啥了,一個庫存被賣了七八次,肯定是賣超了,但是有問題了,redis 是單線程的呀,為啥會賣超呢?


解決方案一:設置 synchronzied

既然因為併發太高導致的問題,那肯定和線程有關,我們嘗試加個鎖唄

先調用第一個接口,重置庫存數,使用客户端命令排查一下

圖片

添加 synchronized 關鍵字

/**      * @description: 在賣出的服務中添加 synchronized 關鍵字 添加鎖      * @author: Xiang想      * @date: 2022/9/5 5:02 PM      * @param: []      * @return: java.lang.String      **/     @RequestMapping("/syncDeductStock")     public String syncDeductStock(){         synchronized (this){             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));             if (stock>0){                 int realStock = stock - 1;                 stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));                 System.out.println("商品扣減成功,剩餘商品:"+realStock);             }else {                 System.out.println("庫存不足....");             }             return "end";         }     }

重新調用一遍賣出接口

圖片

圖片

發現在單機模式下,添加 synchronized  關鍵字,確實能夠避免商品賣超的問題!

但是在微服務情況下,針對該服務設置了集羣,synchronized 還能保證數據的正確性嗎?假設多個服務,被註冊到服務器中心,每個微服務中的處理接口都有 synchronized 關鍵字

我創建了三個服務,用三個不同的端口模擬集羣環境下 synchronized 問題

圖片

然後把 stock 庫存數調整為3的倍數,300

127.0.0.1:6379[10]> set stock 300 OK

創建 jmeter 測試接口

圖片

執行後,發現三個接口都調用了100次,但是庫存還有 145

127.0.0.1:6379[10]> get stock "145"

圖片

分別打開兩個服務的日誌,發現他們消費了重複的商品。


解決方案二:Redis setnx實現分佈式鎖

在Redis中存在一條命令setnx (set if not exists)

setnx key value

如果不存在key,則可以設置成功;否則設置失敗。

我們對 controller 方法進行修改一下

``` @RequestMapping("/redisLockStock") public String redisLockStock(){   String key = "lock";   // setnx   // 由於redis是一個單線程,執行命令採取“隊列”形式排隊!   // 優先進入隊列的命令先執行,由於是setnx,第一個執行後,其他操作執行失敗。   boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");   // 當不存在key時,可以設置成功,回執true;如果存在key,則無法設置,返回false   if (!result) {     // 前端監測,redis中存在,則不能讓這個搶購操作執行,予以提示!     return "err";   }   // 獲取Redis數據庫中的商品數量   Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 減庫存   if (stock > 0) {     int realStock = stock - 1;     stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));     System.out.println("商品扣減成功,剩餘商品:" + realStock);   } else {     System.out.println("庫存不足.....");   }

// 程序執行完成,則刪除這個key   stringRedisTemplate.delete(key);

return "end"; } ```

大概解釋一下

setnx 這個命令是一個上鎖的命令

127.0.0.1:6379[10]> exists lock (integer) 0 127.0.0.1:6379[10]> setnx lock "hi" (integer) 1 127.0.0.1:6379[10]> setnx lock "good" (integer) 0 127.0.0.1:6379[10]> get lock "hi"

通過這一段命令,我們可以看出來,setnx 設置的值,無法被修改。但是可以對其進行刪除,再結合上面的代碼,我們可以得到這樣一個流程

圖片

但就這樣一塊代碼來説,不夠嚴謹,一旦減少庫存操作出現了異常,導致解鎖無法執行,以至於其他請求一直無法拿到 key,程序邏輯死鎖!

我們可以嘗試用 try...finally 解決異常問題!

``` @RequestMapping("/redisLockStock") public String redisLockStock(){    // 創建一個key,保存至redis   String key = "lock";   // setnx   // 由於redis是一個單線程,執行命令採取隊列形式排隊!優先進入隊列的命令先執行,由於是setnx,第一個執行後,其他操作執行失敗   boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");   // 當不存在key時,可以設置成功,回執true;如果存在key,則無法設置,返回false   if (!result) {    // 前端監測,redis中存在,則不能讓這個搶購操作執行,予以提示!    return "err";   }      try {    // 獲取Redis數據庫中的商品數量    Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));    // 減庫存    if (stock > 0) {     int realStock = stock - 1;     stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));     System.out.println("商品扣減成功,剩餘商品:" + realStock);    } else {     System.out.println("庫存不足.....");    }   } finally {    // 程序執行完成,則刪除這個key    // 放置於finally中,保證即使上述邏輯出問題,也能del掉    stringRedisTemplate.delete(key);   }

return "end"; } ```

這樣,就顯得更嚴謹了。

But,但是!!!

如果有一台服務器在減庫存的過程中,出現了斷電、宕機等原因導致 finally 中的語句沒有執行,同樣出現 key 一直存在,導致死鎖!

我們可以通過設置超時時間,我們可以用

stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS);

設置超時時間嗎?不行!!因為可在創建鎖成功的一瞬間就宕機,導致的設置時長無法執行,而死鎖!所以我們要找一個原子類的操作API

stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS);

將 setIfAbsent 和 expire 合併成一條 原子命令。

好了,我們現在設置了10秒的過期時長,這樣就萬無一失了嗎?

如果?執行減少庫存的時間超過了10秒呢?而正好另一台服務器又接收到了請求,依然出現了  不存在的情況,超賣依舊沒有解決。

所以我們到目前總結一下

圖片


解決方案三:Redisson API

setnx 的方式會出現無法準確判斷業務操作時長,而無法保證安全,設置時間太長,性能不好,設置時間短,容易出現賣超問題。

難道就沒有其他辦法了嗎?

Redisson 和 jedis 都是 Java 實現 Redis 的客户端,但是 Redisson 比 jedis 具有更多功能

引入新的 pom 文件

<dependency>             <groupId>org.redisson</groupId>             <artifactId>redisson</artifactId>             <version>3.17.6</version>         </dependency>

創建 bean 配置類

``` package com.liuyuncen.config;

import org.redisson.Redisson; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

/*  * @belongsProject: radis_springboot  * @belongsPackage: com.liuyuncen.config  * @author: Xiang想  * @createTime: 2022-09-08  09:08  * @description: TODO  * @version: 1.0  / @Configuration public class RedissonConfig {

@Value("${spring.redis.host}")     private String redisHost;     @Value("${spring.redis.port}")     private String redisPort;     @Value("${spring.redis.password}")     private String password;     @Value("${spring.redis.database}")     private Integer dataBase;

@Bean     public Redisson createRedisson(){         Config config = new Config();         config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort).setDatabase(dataBase).setPassword(null);         return (Redisson) Redisson.create(config);     } } ```

這裏要注意一下,因為我本地的 redis 沒有密碼,所以在 setPassword(null) 這裏給了空。否則會報 ==Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379== 錯誤

然後就是消費的接口方法

@GetMapping("/redissonLockStock")     public String redissonLockStock() throws InterruptedException {         String key = "lock";         RLock lock = redisson.getLock(key);         try {             lock.lock();             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));             if (stock % 100 == 0){                 System.out.println("A服務 延遲10秒");                 Thread.sleep(10000);                 System.out.println("A服務 延遲10秒結束 繼續後續操作");             }             if (stock>0){                 int resultStock = stock - 1;                 stringRedisTemplate.opsForValue().set("stock",String.valueOf(resultStock));                 System.out.println("A服務 商品扣除成功,剩餘商品:"+resultStock);             }else {                 System.out.println("A服務 庫存不足...");             }         }finally {             lock.unlock();         }         return "end";     }

這裏我還特地添加了 10秒延遲,可以在這個時間段看到 lock 的值

圖片

我們看到他也是通過 TTL 延時過期來實現的。那到底是咋做的呢?

我們來看一下 Redisson 源碼

String key = "lock"; RLock lock = redisson.getLock(key);

org.redisson.RedissonLock.RedissonLock(CommandAsyncExecutor, String)中,我們看到 redisson 給 key 設置的屬性中有超時時間

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {   super(commandExecutor, name);   this.commandExecutor = commandExecutor;   this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();   this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }

超時間數為(org.redisson.config.Config.lockWatchdogTimeout):默認30s

private long lockWatchdogTimeout = 30 * 1000;

其中加鎖、續命鎖在以下代碼中實現

lock.lock();

查看源碼(org.redisson.RedissonLock.lock()),逐個判斷分析得到核心邏輯代碼如下所示:

```  RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {         internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,            // 如果存在 KEYS[1],這個KEYS[1]就是最初設置的redisson.getLock(key)                   "if (redis.call('exists', KEYS[1]) == 0) then " +                       //上述代碼執行邏輯為0,表示不存在                       // 不存在則將 鎖key+線程id設置為hash類型數據保存redis(ARGV[2]為當前執行線程id)                       "redis.call('hset', KEYS[1], ARGV[2], 1); " +                       // 設置這個 hash數據類型 的有效時間                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +                       "return nil; " +                   "end; " +                   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                     // 如果這個 鎖key 在redis中存在,返回1表示數據存在                     //hincrby 自增1                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                       // 重新設定有效時間                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +                       "return nil; " +                   "end; " +                   "return redis.call('pttl', KEYS[1]);",                     Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));     } ```

我們可以看到一打開 redis.call 命令,其實這是 lua 語言,其中的指令有

exists 存在、pexpire 設置有效時間

根據上述源碼中,存在設置超時時間默認為30秒,但是我們知道,真正的業務執行過程不見得就是30秒,拿着一塊 redisson 怎麼處理呢?

在源碼org.redisson.RedissonLock.tryAcquireAsync(long, TimeUnit, long)中,針對時間處理參數做了如下操作:

``` private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {         if (leaseTime != -1) {             return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);         }         RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);         // 設置監聽線程,當異步方法tryLockInnerAsync執行完觸發         ttlRemainingFuture.addListener(new FutureListener() {          // 重寫 operationComplete 方法             @Override             public void operationComplete(Future future) throws Exception {                 if (!future.isSuccess()) {                     return;                 }

Long ttlRemaining = future.getNow();                 // lock acquired                 if (ttlRemaining == null) {                     // 開啟定時任務                     scheduleExpirationRenewal(threadId);                 }             }         });         return ttlRemainingFuture;     } ```

查看定時任務源碼(org.redisson.RedissonLock.scheduleExpirationRenewal(long)):

``` private void scheduleExpirationRenewal(final long threadId) {         if (expirationRenewalMap.containsKey(getEntryName())) {             return;         }   // 定時任務的創建         Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {             @Override             public void run(Timeout timeout) throws Exception {                 //又是一個lua腳本,重新設置鎖                 RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                         "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                         // 獲取redis的hash數據類型中,指定的key-線程id 信息。                         // 如果 == 1 表示存在這個鎖                         // 重新設置key的失效時間                             "redis.call('pexpire', KEYS[1], ARGV[1]); " +                             "return 1; " +                         "end; " +                         "return 0;",                           Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));                                  // 設置失效時間後(evalWriteAsync執行後),開啟監聽                 future.addListener(new FutureListener() {                     @Override                     public void operationComplete(Future future) throws Exception {                         expirationRenewalMap.remove(getEntryName());                         // 如果future 未執行成功                         if (!future.isSuccess()) {                             log.error("Can't update lock " + getName() + " expiration", future.cause());                             return;                         }                         // future 執行完成                         if (future.getNow()) {                          // 調取自身,此時並不會造成死循環                          // 調用自身,繼續執行 TimerTask中的邏輯,包括定時操作                             // reschedule itself                             scheduleExpirationRenewal(threadId);                         }                     }                 });             }            // 每 30/3 也就是10秒         }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {             task.cancel();         }     } ```

通過這個定時和設置延遲時間,我們就可以清楚的知道, redisson 是如何做延時處理的

圖片

redisson 並不是等30秒都執行完了,再去續命,而是每過10秒就續10秒

我每隔一秒執行 ttl lock 發現

127.0.0.1:6379[10]> ttl lock (integer) 23 127.0.0.1:6379[10]> ttl lock (integer) 22 127.0.0.1:6379[10]> ttl lock (integer) 21 127.0.0.1:6379[10]> ttl lock (integer) 20 127.0.0.1:6379[10]> ttl lock (integer) 29 127.0.0.1:6379[10]> ttl lock (integer) 28

那我們知道了 redisson 通過上鎖加續命的方式解決分佈式鎖。還有其他的辦法嗎?

解決方案四:RedLock 高可用併發鎖

用之前,我先説一下大致原理,RedLock 思想為了保證高可用性,在設置key 的時候,會創建多個節點,單個節點設置成功不會告訴程序獲得了鎖,只有超過半數的節點設置成功,才會告訴程序鎖上了

所以我們要創建多個 key

``` @GetMapping("/redLockStock")     public String redLockStock(){         // 創建多個key,         String key1 = "lock:1";         String key2 = "lock:2";         String key3 = "lock:3";         RLock lock1 = redisson.getLock(key1);         RLock lock2 = redisson.getLock(key2);         RLock lock3 = redisson.getLock(key3);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);         try {             boolean tryLock = redLock.tryLock(10, 30, TimeUnit.SECONDS);             if (tryLock){                 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));                 if (stock>0){                     int resultStock = stock - 1;                     stringRedisTemplate.opsForValue().set("stock",String.valueOf(resultStock));                     System.out.println("A服務 商品扣除成功,剩餘商品:"+resultStock);                 }else {                     System.out.println("A服務 庫存不足...");                 }             }         }catch (InterruptedException e){             e.printStackTrace();         }finally {             redLock.unlock();         }         return "end";     } ```

多節點的Redis實現的分佈式鎖 RedLock 可以有效防止單點故障。

我們再來細説他為什麼可以實現這樣的功能

  1. 獲取當前時間戳

  2. client 嘗試按順序使用相同的 key、value 獲取所有 redis 服務的鎖,在獲取鎖的過程中,獲取時間比鎖過期時間短的多,這是為了不要過長時間等待已經關閉的 Redis 服務,並且試着獲取下一個 Redis 實例

    比如 TTL為5秒,設置獲取鎖的時間最多用1秒,如果1秒都沒有獲取到鎖,那就放棄這個鎖,立刻獲取下一個鎖

  3. client通過獲取所有能獲取的時間減去第一步的時間,這個時間差小於TTL時間並且少於有3個redis實例成功獲取鎖,才算正在的獲取鎖成功

  4. 如果成功拿到鎖,鎖的真正有效時間是 TTL 減去第三步的時間差,假如TTL是5秒,獲取鎖用了2秒,真正有效的就是3秒。

  5. 如果客户端由於某些情況獲取鎖失敗,便會開始解鎖所有redis,因為可能也就獲取了小於3個鎖,必須釋放,否則影響其他client獲取鎖

圖片

開始時間是T1是 0:00 ,獲取鎖時所有 key-value 都是一樣的,TTL 是5min,假設漂移時間 1min,最後結束時間是 T2是 0:02 ,所以此鎖最小有效時間為:TTL-(T2-T1)-漂移時間 = 5min - (0:02 - 0:00) -1min = 2min

  • RedLock 算法是否是異步算法?

可以看成是同步算法,因為即使進程間(多個電腦間)沒有同步時間,但是每個進程時間流速大致相同,並且時鐘漂移相對於 TTL 較小,可以忽略,所以可以看成同步算法。

  • RedLock 失敗重試

當client 不能獲取鎖時,應該在隨機時間後重試獲取鎖,並且最好在同一時刻併發把set命令發給所有redis實例,而且對於已經獲取鎖的client在完成任務後及時釋放鎖

  • RedLock 釋放鎖

由於釋放鎖會判斷這個鎖value是不是自己設置的,如果是才刪除,所以釋放的時候很簡單,只要向所有實例發出釋放鎖的命令,不用考慮是否成功釋放

好,到這裏,相信你對賣超問題已經能很好的解決了!

「其他文章」