Redis 解決賣超問題
本文從Redis出發,搭建模擬高併發場景並實現超賣情況,再從redis的角度解決這個問題。場景也是從單機環境到叢集環境,模擬此操作,需要jmeter工具和單個redis服務(因為是模擬,只是為了測執行緒安全,無需叢集)
前言:
需要有可用的 Redis 及 SpringBoot 整合 Redis 的基礎操作,jmeter 工具 (jmeter下載地址:http://jmeter.apache.org/download_jmeter.cgi)
浪費別人的時間就是在謀財害命
文章涉及原始碼:Gitee地址
http://gitee.com/Array_Xiang/spring-boot-redis.git
- 先寫一段簡單的程式碼
- 解決方案一:設定 synchronzied
- 解決方案二:Redis setnx實現分散式鎖
- 解決方案三:Redisson API
- 解決方案四:RedLock 高可用併發鎖
先寫一段簡單的程式碼
“
兩個介面,建立一個
stock
商品設定200個庫存另一個介面,獲取 redis 的庫存數,判斷是否有庫存,如果有,就取出來-1再放回去。
”
``` package com.liuyuncen.shop.controller;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
/* * @belongsProject: radis_springboot * @belongsPackage: com.liuyuncen.shop.controller * @author: Xiang想 * @createTime: 2022-09-05 16:59 * @description: TODO * @version: 1.0 / @RestController public class ShopController {
@Autowired StringRedisTemplate stringRedisTemplate;
/ * @description: 建立一個庫存量為200的商品 * @author: Xiang想 * @date: 2022/9/5 5:00 PM * @param: [] * @return: java.lang.String / @RequestMapping("/setStock") public String setStock(){ stringRedisTemplate.opsForValue().set("stock",200+""); return "ok"; }
/ * @description: 賣出商品,有庫存就減1 沒有庫存就列印無庫存日誌 * @author: Xiang想 * @date: 2022/9/5 5:02 PM * @param: [] * @return: java.lang.String / @RequestMapping("/deductStock") public String deductStock(){ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock>0){ int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock)); System.out.println("商品扣減成功,剩餘商品:"+realStock); }else { System.out.println("庫存不足...."); } return "end"; } } ```
寫好了,啟動服務
先執行第一個介面,建立一個stock 商品,然後建立 jmeter 測試案例
如果不知道怎麼用,看這個連結:http://blog.csdn.net/bin0503/article/details/123543484。建立完成後,點選上面的綠色箭頭
看到這裡,大家就知道發生了啥了,一個庫存被賣了七八次,肯定是賣超了,但是有問題了,redis 是單執行緒的呀,為啥會賣超呢?
解決方案一:設定 synchronzied
既然因為併發太高導致的問題,那肯定和執行緒有關,我們嘗試加個鎖唄
先呼叫第一個介面,重置庫存數,使用客戶端命令排查一下
新增 synchronized
關鍵字
/**
* @description: 在賣出的服務中新增 synchronized 關鍵字 新增鎖
* @author: Xiang想
* @date: 2022/9/5 5:02 PM
* @param: []
* @return: java.lang.String
**/
@RequestMapping("/syncDeductStock")
public String syncDeductStock(){
synchronized (this){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock>0){
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));
System.out.println("商品扣減成功,剩餘商品:"+realStock);
}else {
System.out.println("庫存不足....");
}
return "end";
}
}
重新呼叫一遍賣出介面
發現在單機模式下,新增 synchronized 關鍵字,確實能夠避免商品賣超的問題!
但是在微服務情況下,針對該服務設定了叢集,synchronized
還能保證資料的正確性嗎?假設多個服務,被註冊到伺服器中心,每個微服務中的處理介面都有 synchronized
關鍵字
我建立了三個服務,用三個不同的埠模擬叢集環境下 synchronized 問題
然後把 stock 庫存數調整為3的倍數,300
127.0.0.1:6379[10]> set stock 300
OK
建立 jmeter 測試介面
執行後,發現三個介面都呼叫了100次,但是庫存還有 145
127.0.0.1:6379[10]> get stock
"145"
分別開啟兩個服務的日誌,發現他們消費了重複的商品。
解決方案二:Redis setnx實現分散式鎖
在Redis中存在一條命令setnx (set if not exists)
“
setnx key value
如果不存在key,則可以設定成功;否則設定失敗。
”
我們對 controller 方法進行修改一下
``` @RequestMapping("/redisLockStock") public String redisLockStock(){ String key = "lock"; // setnx // 由於redis是一個單執行緒,執行命令採取“佇列”形式排隊! // 優先進入佇列的命令先執行,由於是setnx,第一個執行後,其他操作執行失敗。 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 當不存在key時,可以設定成功,回執true;如果存在key,則無法設定,返回false if (!result) { // 前端監測,redis中存在,則不能讓這個搶購操作執行,予以提示! return "err"; } // 獲取Redis資料庫中的商品數量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩餘商品:" + realStock); } else { System.out.println("庫存不足....."); }
// 程式執行完成,則刪除這個key stringRedisTemplate.delete(key);
return "end"; } ```
大概解釋一下
setnx
這個命令是一個上鎖的命令
127.0.0.1:6379[10]> exists lock
(integer) 0
127.0.0.1:6379[10]> setnx lock "hi"
(integer) 1
127.0.0.1:6379[10]> setnx lock "good"
(integer) 0
127.0.0.1:6379[10]> get lock
"hi"
通過這一段命令,我們可以看出來,setnx 設定的值,無法被修改。但是可以對其進行刪除,再結合上面的程式碼,我們可以得到這樣一個流程
但就這樣一塊程式碼來說,不夠嚴謹,一旦減少庫存操作出現了異常,導致解鎖無法執行,以至於其他請求一直無法拿到 key,程式邏輯死鎖!
我們可以嘗試用 try...finally 解決異常問題!
``` @RequestMapping("/redisLockStock") public String redisLockStock(){ // 建立一個key,儲存至redis String key = "lock"; // setnx // 由於redis是一個單執行緒,執行命令採取佇列形式排隊!優先進入佇列的命令先執行,由於是setnx,第一個執行後,其他操作執行失敗 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 當不存在key時,可以設定成功,回執true;如果存在key,則無法設定,返回false if (!result) { // 前端監測,redis中存在,則不能讓這個搶購操作執行,予以提示! return "err"; } try { // 獲取Redis資料庫中的商品數量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 減庫存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣減成功,剩餘商品:" + realStock); } else { System.out.println("庫存不足....."); } } finally { // 程式執行完成,則刪除這個key // 放置於finally中,保證即使上述邏輯出問題,也能del掉 stringRedisTemplate.delete(key); }
return "end"; } ```
這樣,就顯得更嚴謹了。
But,但是!!!
如果有一臺伺服器在減庫存的過程中,出現了斷電、宕機等原因導致 finally 中的語句沒有執行,同樣出現 key 一直存在,導致死鎖!
我們可以通過設定超時時間,我們可以用
stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS);
設定超時時間嗎?不行!!因為可在建立鎖成功的一瞬間就宕機,導致的設定時長無法執行,而死鎖!所以我們要找一個原子類的操作API
stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS);
將 setIfAbsent
和 expire
合併成一條 原子命令。
好了,我們現在設定了10秒的過期時長,這樣就萬無一失了嗎?
如果?執行減少庫存的時間超過了10秒呢?而正好另一臺伺服器又接收到了請求,依然出現了 鎖
不存在的情況,超賣依舊沒有解決。
所以我們到目前總結一下
解決方案三:Redisson API
setnx 的方式會出現無法準確判斷業務操作時長,而無法保證安全,設定時間太長,效能不好,設定時間短,容易出現賣超問題。
難道就沒有其他辦法了嗎?
Redisson 和 jedis 都是 Java 實現 Redis 的客戶端,但是 Redisson 比 jedis 具有更多功能
引入新的 pom 檔案
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.6</version>
</dependency>
建立 bean 配置類
``` package com.liuyuncen.config;
import org.redisson.Redisson; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
/* * @belongsProject: radis_springboot * @belongsPackage: com.liuyuncen.config * @author: Xiang想 * @createTime: 2022-09-08 09:08 * @description: TODO * @version: 1.0 / @Configuration public class RedissonConfig {
@Value("${spring.redis.host}") private String redisHost; @Value("${spring.redis.port}") private String redisPort; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.database}") private Integer dataBase;
@Bean public Redisson createRedisson(){ Config config = new Config(); config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort).setDatabase(dataBase).setPassword(null); return (Redisson) Redisson.create(config); } } ```
這裡要注意一下,因為我本地的 redis 沒有密碼,所以在 setPassword(null) 這裡給了空。否則會報 ==Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379== 錯誤
然後就是消費的介面方法
@GetMapping("/redissonLockStock")
public String redissonLockStock() throws InterruptedException {
String key = "lock";
RLock lock = redisson.getLock(key);
try {
lock.lock();
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock % 100 == 0){
System.out.println("A服務 延遲10秒");
Thread.sleep(10000);
System.out.println("A服務 延遲10秒結束 繼續後續操作");
}
if (stock>0){
int resultStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock",String.valueOf(resultStock));
System.out.println("A服務 商品扣除成功,剩餘商品:"+resultStock);
}else {
System.out.println("A服務 庫存不足...");
}
}finally {
lock.unlock();
}
return "end";
}
這裡我還特地添加了 10秒延遲,可以在這個時間段看到 lock 的值
我們看到他也是通過 TTL 延時過期來實現的。那到底是咋做的呢?
我們來看一下 Redisson 原始碼
String key = "lock";
RLock lock = redisson.getLock(key);
在org.redisson.RedissonLock.RedissonLock(CommandAsyncExecutor, String)
中,我們看到 redisson 給 key 設定的屬性中有超時時間
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
超時間數為(org.redisson.config.Config.lockWatchdogTimeout
):預設30s
private long lockWatchdogTimeout = 30 * 1000;
其中加鎖、續命鎖在以下程式碼中實現
lock.lock();
檢視原始碼(org.redisson.RedissonLock.lock()
),逐個判斷分析得到核心邏輯程式碼如下所示:
```
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 如果存在 KEYS[1],這個KEYS[1]就是最初設定的redisson.getLock(key) "if (redis.call('exists', KEYS[1]) == 0) then " + //上述程式碼執行邏輯為0,表示不存在 // 不存在則將 鎖key+執行緒id設定為hash型別資料儲存redis(ARGV[2]為當前執行執行緒id) "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 設定這個 hash資料型別 的有效時間 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果這個 鎖key 在redis中存在,返回1表示資料存在 //hincrby 自增1 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 重新設定有效時間 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.
Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { // 開啟定時任務 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } ```
檢視定時任務原始碼(org.redisson.RedissonLock.scheduleExpirationRenewal(long)
):
```
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
// 定時任務的建立
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//又是一個lua指令碼,重新設定鎖
RFuture
所以我們要建立多個 key
``` @GetMapping("/redLockStock") public String redLockStock(){ // 建立多個key, String key1 = "lock:1"; String key2 = "lock:2"; String key3 = "lock:3"; RLock lock1 = redisson.getLock(key1); RLock lock2 = redisson.getLock(key2); RLock lock3 = redisson.getLock(key3);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3); try { boolean tryLock = redLock.tryLock(10, 30, TimeUnit.SECONDS); if (tryLock){ int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock>0){ int resultStock = stock - 1; stringRedisTemplate.opsForValue().set("stock",String.valueOf(resultStock)); System.out.println("A服務 商品扣除成功,剩餘商品:"+resultStock); }else { System.out.println("A服務 庫存不足..."); } } }catch (InterruptedException e){ e.printStackTrace(); }finally { redLock.unlock(); } return "end"; } ```
多節點的Redis實現的分散式鎖 RedLock 可以有效防止單點故障。
我們再來細說他為什麼可以實現這樣的功能
-
獲取當前時間戳
-
client 嘗試按順序使用相同的 key、value 獲取所有 redis 服務的鎖,在獲取鎖的過程中,獲取時間比鎖過期時間短的多,這是為了不要過長時間等待已經關閉的 Redis 服務,並且試著獲取下一個 Redis 例項
比如 TTL為5秒,設定獲取鎖的時間最多用1秒,如果1秒都沒有獲取到鎖,那就放棄這個鎖,立刻獲取下一個鎖
-
client通過獲取所有能獲取的時間減去第一步的時間,這個時間差小於TTL時間並且少於有3個redis例項成功獲取鎖,才算正在的獲取鎖成功
-
如果成功拿到鎖,鎖的真正有效時間是 TTL 減去第三步的時間差,假如TTL是5秒,獲取鎖用了2秒,真正有效的就是3秒。
-
如果客戶端由於某些情況獲取鎖失敗,便會開始解鎖所有redis,因為可能也就獲取了小於3個鎖,必須釋放,否則影響其他client獲取鎖
開始時間是T1是 0:00 ,獲取鎖時所有 key-value 都是一樣的,TTL 是5min,假設漂移時間 1min,最後結束時間是 T2是 0:02 ,所以此鎖最小有效時間為:TTL-(T2-T1)-漂移時間 = 5min - (0:02 - 0:00) -1min = 2min
- RedLock 演算法是否是非同步演算法?
可以看成是同步演算法,因為即使程序間(多個電腦間)沒有同步時間,但是每個程序時間流速大致相同,並且時鐘漂移相對於 TTL 較小,可以忽略,所以可以看成同步演算法。
- RedLock 失敗重試
當client 不能獲取鎖時,應該在隨機時間後重試獲取鎖,並且最好在同一時刻併發把set命令發給所有redis例項,而且對於已經獲取鎖的client在完成任務後及時釋放鎖
- RedLock 釋放鎖
由於釋放鎖會判斷這個鎖value是不是自己設定的,如果是才刪除,所以釋放的時候很簡單,只要向所有例項發出釋放鎖的命令,不用考慮是否成功釋放
好,到這裡,相信你對賣超問題已經能很好的解決了!