業務常見error示例——併發工具類庫導致的線程安全問題

語言: CN / TW / HK

摘要

通常提到線程安全問題等就有可能聽到關線程安全和併發工具的一些片面的觀點和結論。比如“把 HashMap 改為 ConcurrentHashMap,就可以解決併發問題了呀”“要不我們試試無鎖的 CopyOnWriteArrayList 吧,性能更好”。的確,為了方便開發者進行多線程編程,現代編程語言會提供各種併發工具類。但如果我們沒有充分了解它們的使用場景、解決的問題,以及最佳實踐的話,盲目使用就可能會導致一些坑,小則損失性能,大則無法確保多線程情況下業務邏輯正確性。

一、線程重用導致用户信息錯亂的Bug

之前有業務同學和我反饋,在生產上遇到一個詭異的問題,有時獲取到的用户信息是別人的。查看代碼後,我發現他使用了ThreadLocal 來緩存獲取到的用户信息。我們知道,ThreadLocal 適用於變量在線程間隔離,而在方法或類間共享的場景。如果用户信息的獲取比較昂貴(比如從數據庫查詢用户信息),那麼在 ThreadLocal 中緩存數據是比較合適的做法。但,這麼做為什麼會出現用户信息錯亂的 Bug 呢?

使用 Spring Boot 創建一個 Web 應用程序,使用 ThreadLocal 存放一個 Integer 的值,來暫且代表需要在線程中保存的用户信息,這個值初始是 null。在業務邏輯中,我先從 ThreadLocal 獲取一次值,然後把外部傳入的參數設置到 ThreadLocal 中,來模擬從當前上下文獲取到用户信息的邏輯,隨後再獲取一次值,最後輸出兩次獲得的值和線程名稱。

```java private static final ThreadLocal currentUser = ThreadLocal.withInitial(() -> null);

@GetMapping("wrong") public Map wrong(@RequestParam("userId") Integer userId) {

//設置用户信息之前先查詢一次ThreadLocal中的用户信息

String before  = Thread.currentThread().getName() + ":" + currentUser.get();

//設置用户信息到ThreadLocal

currentUser.set(userId);

//設置用户信息之後再查詢一次ThreadLocal中的用户信息

String after  = Thread.currentThread().getName() + ":" + currentUser.get();

//彙總輸出兩次查詢結果

Map result = new HashMap();

result.put("before", before);

result.put("after", after);

return result;

} ``` 按理説,在設置用户信息之前第一次獲取的值始終應該是 null,但我們要意識到,程序運行在 Tomcat 中,執行程序的線程是 Tomcat 的工作線程,而 Tomcat 的工作線程是基於線程池的。顧名思義,線程池會重用固定的幾個線程,一旦線程重用,那麼很可能首次從 ThreadLocal 獲取的值是之前其他用户的請求遺留的值。這時,ThreadLocal 中的用户信息就是其他用户的信息。

為了更快地重現這個問題,我在配置文件中設置一下 Tomcat 的參數,把工作線程池最大線程數設置為 1,這樣始終是同一個線程在處理請求:

image.png

隨後用户 2 來請求接口,這次就出現了 Bug,第一和第二次獲取到用户 ID 分別是 1 和 2,如果是按照正常的來説數的應該是的null和2。顯然第一次獲取到了用户 1 的信息,原因就是Tomcat 的線程池重用了線程。從圖中可以看到,兩次請求的線程都是同一個線程:http-nio-8080-exec-1。

image.png

這個例子吿訴我們,在寫業務代碼時,首先要理解代碼會跑在什麼線程上:

我們可能會抱怨學多線程沒用,因為代碼裏沒有開啟使用多線程。但其實,可能只是我們沒有意識到,在 Tomcat 這種 Web 服務器下跑的業務代碼,本來就運行在一個多線程環境(否則接口也不可能支持這麼高的併發),並不能認為沒有顯式開啟多線程就不會有線程安全問題。

因為線程的創建比較昂貴,所以 Web 服務器往往會使用線程池來處理請求,這就意味着線程會被重用。這時,使用類似 ThreadLocal 工具來存放一些數據時,需要特別注意在代碼運行完後,顯式地去清空設置的數據。如果在代碼中使用了自定義的線程池,也同樣會遇到這個問題。

理解了這個知識點後,我們修正這段代碼的方案是,在代碼的 finally 代碼塊中,顯式清除 ThreadLocal 中的數據。這樣一來,新的請求過來即使使用了之前的線程也不會獲取到錯誤的用户信息了。修正後的代碼如下:

```java @GetMapping("right") public Map right(@RequestParam("userId") Integer userId) {

String before  = Thread.currentThread().getName() + ":" + currentUser.get();

currentUser.set(userId);

try {

    String after = Thread.currentThread().getName() + ":" + currentUser.get();

    Map result = new HashMap();

    result.put("before", before);

    result.put("after", after);

    return result;

} finally {
    //在finally代碼塊中刪除ThreadLocal中的數據,確保數據不串
    currentUser.remove();
}

} ``` 重新運行程序可以驗證,再也不會出現第一次查詢用户信息查詢到之前用户請求的 Bug:

image.png

ThreadLocal 是利用獨佔資源的方式,來解決線程安全問題,那如果我們確實需要有資源在線程之間共享,應該怎麼辦呢?這時,我們可能就需要用到線程安全的容器了。

二、併發工具導致的線程安全問題

JDK 1.5 後推出的 ConcurrentHashMap,是一個高性能的線程安全的哈希表容器。“線程安全”這四個字特別容易讓人誤解,因為 ConcurrentHashMap 只能保證提供的原子性讀寫操作是線程安全的。我在相當多的業務代碼中看到過這個誤區,比如下面這個場景。有一個含 900 個元素的 Map,現在再補充 100 個元素進去,這個補充操作由 10 個線程併發進行。開發人員誤以為使用了 ConcurrentHashMap 就不會有線程安全問題,於是不加思索地寫出了下面的代碼:在每一個線程的代碼邏輯中先通過 size 方法拿到當前元素數量,計算 ConcurrentHashMap 目前還需要補充多少元素,並在日誌中輸出了這個值,然後通過 putAll 方法把缺少的元素添加進去。 ```java //線程個數

private static int THREAD_COUNT = 10;

//總元素數量

private static int ITEM_COUNT = 1000;

//幫助方法,用來獲得一個指定元素數量模擬數據的ConcurrentHashMap

private ConcurrentHashMap getData(int count) {

return LongStream.rangeClosed(1, count)

        .boxed()

        .collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),

                (o1, o2) -> o1, ConcurrentHashMap::new));

}

@GetMapping("wrong") public String wrong() throws InterruptedException {

ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);

//初始900個元素

log.info("init size:{}", concurrentHashMap.size());

ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);

//使用線程池併發處理邏輯

forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {

    //查詢還需要補充多少個元素

    int gap = ITEM_COUNT - concurrentHashMap.size();

    log.info("gap size:{}", gap);

    //補充元素

    concurrentHashMap.putAll(getData(gap));

}));

//等待所有任務完成

forkJoinPool.shutdown();

forkJoinPool.awaitTermination(1, TimeUnit.HOURS);

//最後元素個數會是1000嗎?

log.info("finish size:{}", concurrentHashMap.size());

return "OK";

} ```

image.png

從日誌中可以看到:

初始大小 900 符合預期,還需要填充 100 個元素。

worker1 線程查詢到當前需要填充的元素為 36,竟然還不是 100 的倍數。 worker13 線程查詢到需要填充的元素數是負的,顯然已經過度填充了。 最後 HashMap 的總項目數是 1536,顯然不符合填充滿 1000 的預期。

針對這個場景,我們可以舉一個形象的例子。ConcurrentHashMap 就像是一個大籃子,現在這個籃子裏有 900 個桔子,我們期望把這個籃子裝滿 1000 個桔子,也就是再裝 100 個桔子。有 10 個工人來幹這件事兒,大家先後到崗後會計算還需要補多少個桔子進去,最後把桔子裝入籃子。

ConcurrentHashMap 這個籃子本身,可以確保多個工人在裝東西進去時,不會相互影響干擾,但無法確保工人 A 看到還需要裝 100 個桔子但是還未裝的時候,工人 B 就看不到籃子中的桔子數量。更值得注意的是,你往這個籃子裝 100 個桔子的操作不是原子性的,在別人看來可能會有一個瞬間籃子裏有 964 個桔子,還需要補 36 個桔子。

回到 ConcurrentHashMap,我們需要注意 ConcurrentHashMap 對外提供的方法或能力的限制:

使用了 ConcurrentHashMap,不代表對它的多個操作之間的狀態是一致的,是沒有其他線程在操作它的,如果需要確保需要手動加鎖。

諸如 size、isEmpty 和 containsValue 等聚合方法,在併發情況下可能會反映 ConcurrentHashMap 的中間狀態。因此在併發情況下,這些方法的返回值只能用作參考,而不能用於流程控制。顯然,利用 size 方法計算差異值,是一個流程控制。諸如 putAll 這樣的聚合方法也不能確保原子性,在 putAll 的過程中去獲取數據可能會獲取到部分數據。

```java @GetMapping("right") public String right() throws InterruptedException {

ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);

log.info("init size:{}", concurrentHashMap.size());

ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);

forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {

    //下面的這段複合邏輯需要鎖一下這個ConcurrentHashMap

    synchronized (concurrentHashMap) {

        int gap = ITEM_COUNT - concurrentHashMap.size();

        log.info("gap size:{}", gap);

        concurrentHashMap.putAll(getData(gap));

    }

}));

forkJoinPool.shutdown();

forkJoinPool.awaitTermination(1, TimeUnit.HOURS);

log.info("finish size:{}", concurrentHashMap.size());

return "OK";

} ```

image.png

可以看到,只有一個線程查詢到了需要補 100 個元素,其他 9 個線程查詢到不需要補元素,最後 Map 大小為 1000。到了這裏,你可能又要問了,使用 ConcurrentHashMap 全程加鎖,還不如使用普通的 HashMap 呢。

其實不完全是這樣。ConcurrentHashMap 提供了一些原子性的簡單複合邏輯方法,用好這些方法就可以發揮其威力。這就引申出代碼中常見的另一個問題:在使用一些類庫提供的高級工具類時,開發人員可能還是按照舊的方式去使用這些新類,因為沒有使用其特性,所以無法發揮其威力。

三、併發工具的特性,導致性能降低問題

我們來看一個使用 Map 來統計 Key 出現次數的場景吧,這個邏輯在業務代碼中非常常見。

使用 ConcurrentHashMap 來統計,Key 的範圍是 10。

使用最多 10 個併發,循環操作 1000 萬次,每次操作累加隨機的 Key。

如果 Key 不存在的話,首次設置值為 1。

```java //循環次數 private static int LOOP_COUNT = 10000000;

//線程數量 private static int THREAD_COUNT = 10;

//元素數量 private static int ITEM_COUNT = 10;

private Map normaluse() throws InterruptedException {

ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);

ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);

forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
    //獲得一個隨機的Key
    String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
            synchronized (freqs) {      
                if (freqs.containsKey(key)) {
                    //Key存在則+1
                    freqs.put(key, freqs.get(key) + 1);

                } else {
                    //Key不存在則初始化為1
                    freqs.put(key, 1L);
                }
            }
        }
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;

} 我們吸取之前的教訓,直接通過鎖的方式鎖住 Map,然後做判斷、讀取現在的累計值、加 1、保存累加後值的邏輯。這段代碼在功能上沒有問題,但無法充分發揮 ConcurrentHashMap 的威力,改進後的代碼如下:java private Map gooduse() throws InterruptedException {

ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);

ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);

forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
    String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
    //利用computeIfAbsent()方法來實例化LongAdder,然後利用LongAdder來進行線程安全計數
    freqs.computeIfAbsent(key, k -> new LongAdder()).increment();

        }

));

forkJoinPool.shutdown();

forkJoinPool.awaitTermination(1, TimeUnit.HOURS);

//因為我們的Value是LongAdder而不是Long,所以需要做一次轉換才能返回

return freqs.entrySet().stream()

        .collect(Collectors.toMap(

                e -> e.getKey(),

                e -> e.getValue().longValue())

        );

} ```

使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 來做複合邏輯操作,判斷 Key 是否存在 Value,如果不存在則把 Lambda 表達式運行後的結果放入 Map 作為 Value,也就是新創建一個 LongAdder 對象,最後返回 Value。

由於 computeIfAbsent 方法返回的 Value 是 LongAdder,是一個線程安全的累加器,因此可以直接調用其 increment 方法進行累加。

這樣在確保線程安全的情況下達到極致性能,把之前 7 行代碼替換為了 1 行。 ```java @GetMapping("good") public String good() throws InterruptedException {

StopWatch stopWatch = new StopWatch();

stopWatch.start("normaluse");

Map<String, Long> normaluse = normaluse();

stopWatch.stop();

//校驗元素數量
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");

//校驗累計總數    
Assert.isTrue(normaluse.entrySet().stream()

                .mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT

        , "normaluse count error");

stopWatch.start("gooduse");

Map<String, Long> gooduse = gooduse();

stopWatch.stop();

Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");

Assert.isTrue(gooduse.entrySet().stream()

                .mapToLong(item -> item.getValue())

                .reduce(0, Long::sum) == LOOP_COUNT

        , "gooduse count error");

log.info(stopWatch.prettyPrint());

return "OK";

} ``` 這段測試代碼並無特殊之處,使用 StopWatch 來測試兩段代碼的性能,最後跟了一個斷言判斷 Map 中元素的個數以及所有 Value 的和,是否符合預期來校驗代碼的正確性。測試結果如下:

image.png

可以看到,優化後的代碼,相比使用鎖來操作 ConcurrentHashMap 的方式,性能提升了 10 倍。你可能會問,computeIfAbsent 為什麼如此高效呢?答案就在源碼最核心的部分,也就是 Java 自帶的 Unsafe 實現的 CAS。它在虛擬機層面確保了寫入數據的原子性,比加鎖的效率高得多: ```java static final boolean casTabAt(Node[] tab, int i, Node c, Node v) {

    return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);

}

``` 像 ConcurrentHashMap 這樣的高級併發工具的確提供了一些高級 API,只有充分了解其特性才能最大化其威力,而不能因為其足夠高級、酷炫盲目使用。

四、不熟悉併發工具的使用場景,因而導致性能問題

除了 ConcurrentHashMap 這樣通用的併發工具類之外,我們的工具包中還有些針對特殊場景實現的生面孔。一般來説,針對通用場景的通用解決方案,在所有場景下性能都還可以,屬於“萬金油”;而針對特殊場景的特殊實現,會有比通用解決方案更高的性能,但一定要在它針對的場景下使用,否則可能會產生性能問題甚至是 Bug。

之前在排查一個生產性能問題時,我們發現一段簡單的非數據庫操作的業務邏輯,消耗了超出預期的時間,在修改數據時操作本地緩存比回寫數據庫慢許多。查看代碼發現,開發同學使用了 CopyOnWriteArrayList 來緩存大量的數據,而數據變化又比較頻繁。

CopyOnWrite 是一個時髦的技術,不管是 Linux 還是 Redis 都會用到。在 Java 中,CopyOnWriteArrayList 雖然是一個線程安全的 ArrayList,但因為其實現方式是,每次修改數據時都會複製一份數據出來,所以有明顯的適用場景,即讀多寫少或者説希望無鎖讀的場景。

如果我們要使用 CopyOnWriteArrayList,那一定是因為場景需要而不是因為足夠酷炫。如果讀寫比例均衡或者有大量寫操作的話,使用 CopyOnWriteArrayList 的性能會非常糟糕。

我們寫一段測試代碼,來比較下使用 CopyOnWriteArrayList 和普通加鎖方式 ArrayList 的讀寫性能吧。在這段代碼中我們針對併發讀和併發寫分別寫了一個測試方法,測試兩者一定次數的寫或讀操作的耗時。

```java //測試併發寫的性能 @GetMapping("write") public Map testWrite() {

List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();

List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());

StopWatch stopWatch = new StopWatch();

int loopCount = 100000;

stopWatch.start("Write:copyOnWriteArrayList");

//循環100000次併發往CopyOnWriteArrayList寫入隨機元素

IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));

stopWatch.stop();

stopWatch.start("Write:synchronizedList");

//循環100000次併發往加鎖的ArrayList寫入隨機元素

IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));

stopWatch.stop();

log.info(stopWatch.prettyPrint());

Map result = new HashMap();

result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());

result.put("synchronizedList", synchronizedList.size());

return result;

}

//幫助方法用來填充List

private void addAll(List list) {

list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));

}

//測試併發讀的性能

@GetMapping("read") public Map testRead() {

//創建兩個測試對象

List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();

List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());

//填充數據

addAll(copyOnWriteArrayList);

addAll(synchronizedList);

StopWatch stopWatch = new StopWatch();

int loopCount = 1000000;

int count = copyOnWriteArrayList.size();

stopWatch.start("Read:copyOnWriteArrayList");

//循環1000000次併發從CopyOnWriteArrayList隨機查詢元素

IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));

stopWatch.stop();

stopWatch.start("Read:synchronizedList");

//循環1000000次併發從加鎖的ArrayList隨機查詢元素

IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));

stopWatch.stop();

log.info(stopWatch.prettyPrint());

Map result = new HashMap();

result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());

result.put("synchronizedList", synchronizedList.size());

return result;

} ``` 運行程序可以看到,大量寫的場景(10 萬次 add 操作),CopyOnWriteArray 幾乎比同步的 ArrayList 慢一百倍:

image.png

而在大量讀的場景下(100 萬次 get 操作),CopyOnWriteArray 又比同步的 ArrayList 快五倍以上:

image.png

你可能會問,為何在大量寫的場景下,CopyOnWriteArrayList 會這麼慢呢?答案就在源碼中。以 add 方法為例,每次 add 時,都會用 Arrays.copyOf 創建一個新數組,頻繁 add 時內存的申請釋放消耗會很大:

```java /* * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) /

public boolean add(E e) {

    synchronized (lock) {

        Object[] elements = getArray();

        int len = elements.length;

        Object[] newElements = Arrays.copyOf(elements, len + 1);

        newElements[len] = e;

        setArray(newElements);

        return true;

    }

}

```

博文參考

  • 極客時間