業務常見error示例——併發工具類庫導致的執行緒安全問題
摘要
通常提到執行緒安全問題等就有可能聽到關執行緒安全和併發工具的一些片面的觀點和結論。比如“把 HashMap 改為 ConcurrentHashMap,就可以解決併發問題了呀”“要不我們試試無鎖的 CopyOnWriteArrayList 吧,效能更好”。的確,為了方便開發者進行多執行緒程式設計,現代程式語言會提供各種併發工具類。但如果我們沒有充分了解它們的使用場景、解決的問題,以及最佳實踐的話,盲目使用就可能會導致一些坑,小則損失效能,大則無法確保多執行緒情況下業務邏輯正確性。
一、執行緒重用導致使用者資訊錯亂的Bug
之前有業務同學和我反饋,在生產上遇到一個詭異的問題,有時獲取到的使用者資訊是別人的。檢視程式碼後,我發現他使用了ThreadLocal 來快取獲取到的使用者資訊。我們知道,ThreadLocal 適用於變數線上程間隔離,而在方法或類間共享的場景。如果使用者資訊的獲取比較昂貴(比如從資料庫查詢使用者資訊),那麼在 ThreadLocal 中快取資料是比較合適的做法。但,這麼做為什麼會出現使用者資訊錯亂的 Bug 呢?
使用 Spring Boot 建立一個 Web 應用程式,使用 ThreadLocal 存放一個 Integer 的值,來暫且代表需要線上程中儲存的使用者資訊,這個值初始是 null。在業務邏輯中,我先從 ThreadLocal 獲取一次值,然後把外部傳入的引數設定到 ThreadLocal 中,來模擬從當前上下文獲取到使用者資訊的邏輯,隨後再獲取一次值,最後輸出兩次獲得的值和執行緒名稱。
```java
private static final ThreadLocal
@GetMapping("wrong") public Map wrong(@RequestParam("userId") Integer userId) {
//設定使用者資訊之前先查詢一次ThreadLocal中的使用者資訊
String before = Thread.currentThread().getName() + ":" + currentUser.get();
//設定使用者資訊到ThreadLocal
currentUser.set(userId);
//設定使用者資訊之後再查詢一次ThreadLocal中的使用者資訊
String after = Thread.currentThread().getName() + ":" + currentUser.get();
//彙總輸出兩次查詢結果
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
} ``` 按理說,在設定使用者資訊之前第一次獲取的值始終應該是 null,但我們要意識到,程式執行在 Tomcat 中,執行程式的執行緒是 Tomcat 的工作執行緒,而 Tomcat 的工作執行緒是基於執行緒池的。顧名思義,執行緒池會重用固定的幾個執行緒,一旦執行緒重用,那麼很可能首次從 ThreadLocal 獲取的值是之前其他使用者的請求遺留的值。這時,ThreadLocal 中的使用者資訊就是其他使用者的資訊。
為了更快地重現這個問題,我在配置檔案中設定一下 Tomcat 的引數,把工作執行緒池最大執行緒數設定為 1,這樣始終是同一個執行緒在處理請求:
隨後使用者 2 來請求介面,這次就出現了 Bug,第一和第二次獲取到使用者 ID 分別是 1 和 2,如果是按照正常的來說數的應該是的null和2。顯然第一次獲取到了使用者 1 的資訊,原因就是Tomcat 的執行緒池重用了執行緒。從圖中可以看到,兩次請求的執行緒都是同一個執行緒:http-nio-8080-exec-1。
這個例子告訴我們,在寫業務程式碼時,首先要理解程式碼會跑在什麼執行緒上:
我們可能會抱怨學多執行緒沒用,因為程式碼裡沒有開啟使用多執行緒。但其實,可能只是我們沒有意識到,在 Tomcat 這種 Web 伺服器下跑的業務程式碼,本來就執行在一個多執行緒環境(否則介面也不可能支援這麼高的併發),並不能認為沒有顯式開啟多執行緒就不會有執行緒安全問題。
因為執行緒的建立比較昂貴,所以 Web 伺服器往往會使用執行緒池來處理請求,這就意味著執行緒會被重用。這時,使用類似 ThreadLocal 工具來存放一些資料時,需要特別注意在程式碼執行完後,顯式地去清空設定的資料。如果在程式碼中使用了自定義的執行緒池,也同樣會遇到這個問題。
理解了這個知識點後,我們修正這段程式碼的方案是,在程式碼的 finally 程式碼塊中,顯式清除 ThreadLocal 中的資料。這樣一來,新的請求過來即使使用了之前的執行緒也不會獲取到錯誤的使用者資訊了。修正後的程式碼如下:
```java @GetMapping("right") public Map right(@RequestParam("userId") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
} finally {
//在finally程式碼塊中刪除ThreadLocal中的資料,確保資料不串
currentUser.remove();
}
} ``` 重新執行程式可以驗證,再也不會出現第一次查詢使用者資訊查詢到之前使用者請求的 Bug:
ThreadLocal 是利用獨佔資源的方式,來解決執行緒安全問題,那如果我們確實需要有資源線上程之間共享,應該怎麼辦呢?這時,我們可能就需要用到執行緒安全的容器了。
二、併發工具導致的執行緒安全問題
JDK 1.5 後推出的 ConcurrentHashMap,是一個高效能的執行緒安全的雜湊表容器。“執行緒安全”這四個字特別容易讓人誤解,因為 ConcurrentHashMap 只能保證提供的原子性讀寫操作是執行緒安全的。我在相當多的業務程式碼中看到過這個誤區,比如下面這個場景。有一個含 900 個元素的 Map,現在再補充 100 個元素進去,這個補充操作由 10 個執行緒併發進行。開發人員誤以為使用了 ConcurrentHashMap 就不會有執行緒安全問題,於是不加思索地寫出了下面的程式碼:在每一個執行緒的程式碼邏輯中先通過 size 方法拿到當前元素數量,計算 ConcurrentHashMap 目前還需要補充多少元素,並在日誌中輸出了這個值,然後通過 putAll 方法把缺少的元素新增進去。 ```java //執行緒個數
private static int THREAD_COUNT = 10;
//總元素數量
private static int ITEM_COUNT = 1000;
//幫助方法,用來獲得一個指定元素數量模擬資料的ConcurrentHashMap
private ConcurrentHashMap
return LongStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
}
@GetMapping("wrong") public String wrong() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900個元素
log.info("init size:{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用執行緒池併發處理邏輯
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查詢還需要補充多少個元素
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
//補充元素
concurrentHashMap.putAll(getData(gap));
}));
//等待所有任務完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最後元素個數會是1000嗎?
log.info("finish size:{}", concurrentHashMap.size());
return "OK";
} ```
從日誌中可以看到:
初始大小 900 符合預期,還需要填充 100 個元素。
worker1 執行緒查詢到當前需要填充的元素為 36,竟然還不是 100 的倍數。 worker13 執行緒查詢到需要填充的元素數是負的,顯然已經過度填充了。 最後 HashMap 的總專案數是 1536,顯然不符合填充滿 1000 的預期。
針對這個場景,我們可以舉一個形象的例子。ConcurrentHashMap 就像是一個大籃子,現在這個籃子裡有 900 個桔子,我們期望把這個籃子裝滿 1000 個桔子,也就是再裝 100 個桔子。有 10 個工人來幹這件事兒,大家先後到崗後會計算還需要補多少個桔子進去,最後把桔子裝入籃子。
ConcurrentHashMap 這個籃子本身,可以確保多個工人在裝東西進去時,不會相互影響干擾,但無法確保工人 A 看到還需要裝 100 個桔子但是還未裝的時候,工人 B 就看不到籃子中的桔子數量。更值得注意的是,你往這個籃子裝 100 個桔子的操作不是原子性的,在別人看來可能會有一個瞬間籃子裡有 964 個桔子,還需要補 36 個桔子。
回到 ConcurrentHashMap,我們需要注意 ConcurrentHashMap 對外提供的方法或能力的限制:
使用了 ConcurrentHashMap,不代表對它的多個操作之間的狀態是一致的,是沒有其他執行緒在操作它的,如果需要確保需要手動加鎖。
諸如 size、isEmpty 和 containsValue 等聚合方法,在併發情況下可能會反映 ConcurrentHashMap 的中間狀態。因此在併發情況下,這些方法的返回值只能用作參考,而不能用於流程控制。顯然,利用 size 方法計算差異值,是一個流程控制。諸如 putAll 這樣的聚合方法也不能確保原子性,在 putAll 的過程中去獲取資料可能會獲取到部分資料。
```java @GetMapping("right") public String right() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size:{}", concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//下面的這段複合邏輯需要鎖一下這個ConcurrentHashMap
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
concurrentHashMap.putAll(getData(gap));
}
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size());
return "OK";
} ```
可以看到,只有一個執行緒查詢到了需要補 100 個元素,其他 9 個執行緒查詢到不需要補元素,最後 Map 大小為 1000。到了這裡,你可能又要問了,使用 ConcurrentHashMap 全程加鎖,還不如使用普通的 HashMap 呢。
其實不完全是這樣。ConcurrentHashMap 提供了一些原子性的簡單複合邏輯方法,用好這些方法就可以發揮其威力。這就引申出程式碼中常見的另一個問題:在使用一些類庫提供的高階工具類時,開發人員可能還是按照舊的方式去使用這些新類,因為沒有使用其特性,所以無法發揮其威力。
三、併發工具的特性,導致效能降低問題
我們來看一個使用 Map 來統計 Key 出現次數的場景吧,這個邏輯在業務程式碼中非常常見。
使用 ConcurrentHashMap 來統計,Key 的範圍是 10。
使用最多 10 個併發,迴圈操作 1000 萬次,每次操作累加隨機的 Key。
如果 Key 不存在的話,首次設定值為 1。
```java //迴圈次數 private static int LOOP_COUNT = 10000000;
//執行緒數量 private static int THREAD_COUNT = 10;
//元素數量 private static int ITEM_COUNT = 10;
private Map
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
//獲得一個隨機的Key
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
if (freqs.containsKey(key)) {
//Key存在則+1
freqs.put(key, freqs.get(key) + 1);
} else {
//Key不存在則初始化為1
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;
}
我們吸取之前的教訓,直接通過鎖的方式鎖住 Map,然後做判斷、讀取現在的累計值、加 1、儲存累加後值的邏輯。這段程式碼在功能上沒有問題,但無法充分發揮 ConcurrentHashMap 的威力,改進後的程式碼如下:
java
private Map
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
//利用computeIfAbsent()方法來例項化LongAdder,然後利用LongAdder來進行執行緒安全計數
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//因為我們的Value是LongAdder而不是Long,所以需要做一次轉換才能返回
return freqs.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue())
);
} ```
使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 來做複合邏輯操作,判斷 Key 是否存在 Value,如果不存在則把 Lambda 表示式執行後的結果放入 Map 作為 Value,也就是新建立一個 LongAdder 物件,最後返回 Value。
由於 computeIfAbsent 方法返回的 Value 是 LongAdder,是一個執行緒安全的累加器,因此可以直接呼叫其 increment 方法進行累加。
這樣在確保執行緒安全的情況下達到極致效能,把之前 7 行程式碼替換為了 1 行。 ```java @GetMapping("good") public String good() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("normaluse");
Map<String, Long> normaluse = normaluse();
stopWatch.stop();
//校驗元素數量
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
//校驗累計總數
Assert.isTrue(normaluse.entrySet().stream()
.mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT
, "normaluse count error");
stopWatch.start("gooduse");
Map<String, Long> gooduse = gooduse();
stopWatch.stop();
Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
Assert.isTrue(gooduse.entrySet().stream()
.mapToLong(item -> item.getValue())
.reduce(0, Long::sum) == LOOP_COUNT
, "gooduse count error");
log.info(stopWatch.prettyPrint());
return "OK";
} ``` 這段測試程式碼並無特殊之處,使用 StopWatch 來測試兩段程式碼的效能,最後跟了一個斷言判斷 Map 中元素的個數以及所有 Value 的和,是否符合預期來校驗程式碼的正確性。測試結果如下:
可以看到,優化後的程式碼,相比使用鎖來操作 ConcurrentHashMap 的方式,效能提升了 10 倍。你可能會問,computeIfAbsent 為什麼如此高效呢?答案就在原始碼最核心的部分,也就是 Java 自帶的 Unsafe 實現的 CAS。它在虛擬機器層面確保了寫入資料的原子性,比加鎖的效率高得多:
```java
static final
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
``` 像 ConcurrentHashMap 這樣的高階併發工具的確提供了一些高階 API,只有充分了解其特性才能最大化其威力,而不能因為其足夠高階、酷炫盲目使用。
四、不熟悉併發工具的使用場景,因而導致效能問題
除了 ConcurrentHashMap 這樣通用的併發工具類之外,我們的工具包中還有些針對特殊場景實現的生面孔。一般來說,針對通用場景的通用解決方案,在所有場景下效能都還可以,屬於“萬金油”;而針對特殊場景的特殊實現,會有比通用解決方案更高的效能,但一定要在它針對的場景下使用,否則可能會產生效能問題甚至是 Bug。
之前在排查一個生產效能問題時,我們發現一段簡單的非資料庫操作的業務邏輯,消耗了超出預期的時間,在修改資料時操作本地快取比回寫資料庫慢許多。檢視程式碼發現,開發同學使用了 CopyOnWriteArrayList 來快取大量的資料,而資料變化又比較頻繁。
CopyOnWrite 是一個時髦的技術,不管是 Linux 還是 Redis 都會用到。在 Java 中,CopyOnWriteArrayList 雖然是一個執行緒安全的 ArrayList,但因為其實現方式是,每次修改資料時都會複製一份資料出來,所以有明顯的適用場景,即讀多寫少或者說希望無鎖讀的場景。
如果我們要使用 CopyOnWriteArrayList,那一定是因為場景需要而不是因為足夠酷炫。如果讀寫比例均衡或者有大量寫操作的話,使用 CopyOnWriteArrayList 的效能會非常糟糕。
我們寫一段測試程式碼,來比較下使用 CopyOnWriteArrayList 和普通加鎖方式 ArrayList 的讀寫效能吧。在這段程式碼中我們針對併發讀和併發寫分別寫了一個測試方法,測試兩者一定次數的寫或讀操作的耗時。
```java //測試併發寫的效能 @GetMapping("write") public Map testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("Write:copyOnWriteArrayList");
//迴圈100000次併發往CopyOnWriteArrayList寫入隨機元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("Write:synchronizedList");
//迴圈100000次併發往加鎖的ArrayList寫入隨機元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}
//幫助方法用來填充List
private void addAll(List
list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
}
//測試併發讀的效能
@GetMapping("read") public Map testRead() {
//建立兩個測試物件
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
//填充資料
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 1000000;
int count = copyOnWriteArrayList.size();
stopWatch.start("Read:copyOnWriteArrayList");
//迴圈1000000次併發從CopyOnWriteArrayList隨機查詢元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("Read:synchronizedList");
//迴圈1000000次併發從加鎖的ArrayList隨機查詢元素
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
} ``` 執行程式可以看到,大量寫的場景(10 萬次 add 操作),CopyOnWriteArray 幾乎比同步的 ArrayList 慢一百倍:
而在大量讀的場景下(100 萬次 get 操作),CopyOnWriteArray 又比同步的 ArrayList 快五倍以上:
你可能會問,為何在大量寫的場景下,CopyOnWriteArrayList 會這麼慢呢?答案就在原始碼中。以 add 方法為例,每次 add 時,都會用 Arrays.copyOf 建立一個新陣列,頻繁 add 時記憶體的申請釋放消耗會很大:
```java /* * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) /
public boolean add(E e) {
synchronized (lock) {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
}
}
```
博文參考
- 極客時間