王者併發課-星耀02:窮理盡妙-解構同步器設計原理與AQS淺析

語言: CN / TW / HK

theme: v-green highlight: atom-one-dark


歡迎來到《王者併發課》,本文是該系列文章中的第28篇,星耀中的第2篇

說起JUC中大名鼎鼎的AQS(AbstractQueuedSynchronizer),相信大家對其並不陌生,說是如雷貫耳也不為過。然而,當我們需要走近它的時候,大部分人或望而卻步,或繞道而行。原因在於,雖然我們有了解它的慾望,但想把它搞清楚又似乎總是不容易。正所謂撼山易,撼AQS難

作為星耀系列的文章,解決的正是此類難解之題。所以,我們將在這篇文章中邀你一起,用庖丁解牛的方式來逐步剖析AQS的表象與核心,理解AQS的原理及應用。在本文中,我們不會開篇就直搗黃龍、解讀原始碼,而是從最基本的同步說起,並由同步循序漸進地講解同步器、CLH佇列和AQS的基本構成,以及AQS在ReentrantLock和CountDownLatch中的應用,最終實現解構AQS的目的

一、何為同步

在討論所謂的同步概念之前,我們再來回顧經典的醫院就診的場景。

話說冤家路窄的開和趙子龍在林間相遇,話不投機三句開打。數百個來也不見勝負,但兩人都已經披紅掛綵,只好休戰同去醫院看大夫。

眾所周知,醫院向來是醫生少患者多,經常人滿為患,就算武功高強也得講究先來後到。於是,趙子龍和鎧先在掛號處取了號,一看就診室有人就診,外面還有長隊,只好面面相覷加入到等候的隊伍中。

說到這,相信你對醫院這個機制並不陌生,是不是很簡單?但我們要說的是,這個場景體現的就是經典的 同步機制 ,理解這個機制你對 AQS就已經理解了一半。不信?往下看。

image-20220417161352574

在併發程式設計中,說起“同步”,有些人可能總覺得這個詞有些彆扭。沒錯,所謂同步是synchronize 的翻譯。但是,對於這個英文單詞,我們的理解往往又不是很準確,於是聽起來甚至有些彆扭。因此,我們首先要能正確理解這個詞的含義,才能進一步明白同步器是個啥玩意。

在中文語境中,我們說同步,往往指的是兩個或兩個以上隨時間變化的量在變化過程中保持一定的相對關係。比如,把本地的資料同步到雲端,或者是從手機同步到電腦。看到沒有,中文語境中的同步往往指的是同一事物在不同位置的相對變化,比如手機中的照片同步到雲端,照片是不是還是那個照片?注意,它說的是同一事物。

但是,在英文中synchronize 的含義則完全不同於我們日常所理解的同步,即使我們把它翻譯為 “同步” 。請看英文中對synchronize 的解釋:

image-20220419175923019

看到英文版釋義中的at the same time,相信你對synchronize 這個詞的準確含義已經豁然開朗,並瞭然於心。原來,所謂同步就是搞定併發的意思,是兩個事件同時發生的關係。而在電腦科學中,同步器則是用於處理同步關係的結構和演算法。

基於對同步的理解,可以發現前述的醫生就診問題也是一種同步機制的設計問題,即如何處理多名患者同時看一個醫生,這裡就需要考慮兩個關鍵點:醫生的就診狀態患者的佇列。下面我們通過ReentrantLock來模擬這個就診問題:

  • 如果患者通過qualificationLock.lock()獲得了就診資格(比如此刻沒有就診),那麼他可以直接就診;
  • 如果患者未能通過qualificationLock.lock()獲得就診資格,則患者需要進入qualificationLock中的排隊機制進行等待。

你看,基於AQS實現的ReentrantLock用起來很簡單,其解決問題的思路也很樸素,無非是我們日常也能遇到的問題,所以我們先從樸素的問題上認識AQS。

```java / * 門診就診(就診+排隊)同步示例 */ public class OutpatientDemo { / * 當前就診資格 */ private final ReentrantLock qualificationLock = new ReentrantLock();

public void 就診() {
    qualificationLock.lock();  // 如果門診沒有人,則獲取當前就診資格,直接就診。否則,將進入大廳佇列排隊等候。
    try {
        // ... 就診中
    } finally {
        qualificationLock.unlock(); // 就診結束離開後,釋放資格。
    }
}

} ```

二、同步器的設計思路

對於普遍存在的同步問題,我們可以根據具體問題設計不同的同步機制,也就是所謂的同步器。比如,醫生通過取號機、叫號器、顯示屏和等候區來解決多人排隊問題,去銀行辦理業務也是類似的機制。當然,這些都是我們生活中的同步機制。而在Java中,我們則可以通過AQS和synchronize關鍵字來實現同步。所以,理解同步器的設計思路是後續理解AQS的關鍵,而當我們從現實世界的角度看程式世界時,問題則相對更容易理解。

(一)同步器的基本組成

無論是現實世界還是程式世界,在同步器設計的設計上總體是相似的。比如,都有狀態佇列,以及圍繞這兩個點再增加一些其他輔助機制。所以,從抽象的角度看,一個同步器應該具備以下四要素:

  • 臨界狀態(State):用於確定是否可以授予訪問物件(比如執行緒、患者等)的訪問權;
  • 臨界狀態訪問控制(Access Condition):用於判斷訪問物件是否滿足訪問臨界狀態的條件。比如,當就診室狀態為“就診中”時,則不允許就診人進入,反之則允許;
  • 臨界狀態可變更(State Changes):當執行緒獲得了對臨界區的訪問許可權,它必須更改同步器的狀態,以(可能)阻止其他執行緒進入它。換句話說,狀態需要反映執行緒現在正在臨界區內執行的事實。比如,當就診室中有人就診時,其狀態就必須為“就診中”,而就診結束時則應該切換到“待就診”;
  • 狀態變更可通知(Notification Strategy):當執行緒變更了同步器的狀態時,應當以合適的策略通知到其他執行緒。比如,當前就診人結束就診離開診室後,應當通過螢幕或廣播等方式通知佇列中等待的其他就診人。

當然,這四要素只是在設計同步器時的基本思路和原則,具體同步器的設計並不侷限於以上的四要素,可能沒有其中的部分組成,也可能會有其他的組成等。

image-20220417160430440

(二)佇列的選型

現在,我們已經理解設計同步器的要素,以及其必要的組成,比如狀態和佇列。對於狀態,相對比較簡單也好理解,比如AQS中的狀態只是一個int型別的欄位。而對於佇列的理解,則是關鍵所在,如何設計有效的排隊機制,關乎到資料的安全性、同步的效率以及擴充套件性等多方面的考量。所以,這部分我們要講的就是如何設計佇列,包含無佇列和CLH佇列等。

1. 無佇列

所謂無佇列設計,即下面的左圖所示,不同的執行緒為了爭取到自己需要的資源會一擁而上,毫無秩序可言。在無佇列設計中,力氣大、臉皮厚的執行緒可能會優先獲得資源,而素質高的執行緒則可能一直在禮讓,導致始終無法獲得資源,也就是我們常說的“飢餓”,因為它不公平。同時,無佇列也會導致大量的執行緒阻塞,對於系統來說易引發災難。

無佇列下的執行緒競爭情況可以用下面這幅圖來說明,每個執行緒通過自旋鎖來請求資源。所謂自旋,並不是蒙上眼睛原地轉圈的意思,而是它會主動地、時不時地去詢問資源是否就緒,就像我們時不時跑到門口詢問“到我了嗎”。

自旋鎖的優點在於實現簡單,也不需要複雜的排程和佇列管理。但是,它的缺點則在於:

  • 鎖飢餓:在鎖的競爭中,有的執行緒可能會始終被插隊,導致飢餓;
  • 效能堪憂:多個執行緒同時輪詢狀態時,一個是消耗執行緒所在的CPU資源,一個是導致多個CPU的快取記憶體頻繁同步,影響CPU效率。

所以,自旋鎖適用於競爭不激烈、鎖持有時間短的場景,像AQS這種為各場景提供基礎同步能力的同步器,自然不適合採用這種方式。

image-20220419164936239

既然無佇列會存在無序競爭的情況,那如何解決這個問題?如果是我們來設計,如何設計這個佇列,如何保證執行緒的公平性併兼顧效能?這就要說說CHL隊列了。

2. 理解CLH佇列鎖

CLH佇列鎖是由Craig, Landin, and Hagersten三人共同設計的一種佇列鎖機制,我們先來看看CLH佇列的基本模樣,如下圖所示。

image-20220422202346791

從圖中可以看到,在CLH佇列中每個執行緒都是一個節點(Node),在Node中同時還有節點的前驅指標。從結構上看,CLH佇列和普通佇列似乎沒有區別。但是,這個佇列需要解決一個核心問題:如何解決鎖的競爭和釋放中的效能問題。 所有節點都要自主去競爭?顯然不可能。這好比我們在候診區等待時,時不時都去問問醫生”到我了嗎“,如果這麼做我們沒累癱,醫生已經被煩死。

解決這個問題有兩個思路:一個是廣播通知下一位患者,比如醫院的大屏和喇叭通知;另一個則是我們可以看排在我們前面的人,比如排在我們前面的人已經進去就診了,那下一位必然是輪到我,不然還有誰? 這兩種思路,醫院資訊化系統通常採用的是第一種,而計算機中的CLH佇列鎖採用的則是第二種:既然大家都輪詢同一把鎖效率低,那我們為何不能只輪詢各自的前序節點狀態呢?這就是CLH佇列鎖的精髓所在。

現在,我們結合下面這幅圖用計算機的語言來描述它的原理和精髓:所謂CLH鎖是通過移動尾部節點實現的FIFO佇列,每個節點包含了執行緒、前序節點資訊以及各自的狀態。CLH佇列中的各節點不會輪詢同一個共享變數,而是僅輪詢各自的本地變數,從而解決效率的問題。此外,FIFO屬性也保障了CLH佇列的公平性。

image-20220422202312291

當然,要實現CLH佇列鎖的高效和公平性,我們在構建佇列時,就需要考慮如何設計佇列、如何入隊、如何出隊和如何喚醒節點等一系列問題,而這些問題正是AQS中的一些關鍵問題,我們將在後文中逐漸展開敘述。

補充資訊:現代計算機通常是多CPU架構,各自CPU有著自己的快取記憶體。當不同的執行緒位於不同的CPU時,執行緒間交換資料時就需要特別考慮效能問題。CLH佇列鎖針對某些架構是高效的,但換了其他架構則未必,因此我們要了解這部分知識和它的侷限性。這裡沒有說具體的哪種架構,是為避免在本文將問題擴大化,引入太多額外知識會增加不必要的負擔,有興趣的同學可以自行檢索這方面的知識。

  • CLH佇列鎖擴充套件閱讀:https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/CLH.pdf

3. AQS中的CLH佇列鎖

現在,我們知道CLH佇列鎖有著效率高、公平和實現簡單等優點,那它有沒有缺點呢?當然有。CLH佇列的主要缺點在於:一是節點的自旋影響CPU的效率;二是它的實現過於簡單,不能滿足AQS中複雜的場景需要,比如AQS中節點的阻塞和喚醒等。因此,AQS採用了CLH鎖,但是對它進行了一些改造和擴充套件,主要是通過節點狀態waitStatus來豐富佇列的操作性

所以,關鍵的問題就來了:如何設計重新CLH佇列、如何解決入隊出隊等一系列問題?

三、初始AQS

(一)基本用途

AQS的全稱是AbstractQueuedSynchronizer,即抽象佇列同步器。這個名字有三個單詞,其中Abstract表示它是一個抽象類,AQS原始碼中定義了一些具體的方法,也定義了一些抽象的方法。換句話說,我們不能單獨地直接AQS,而是需要繼承它並實現部分能力

在JAVA的JUC中,AQS的使用非常廣泛,在我們前面的系列文章都有提到,比如SemaphoreReentrantLockReentrantReadWriteLockThreadPoolExecutorCountDownLatch等,相關類對AQS的使用如下圖所示。不誇張地說,在我們需要使用同步的時候,其背後幾乎都有AQS的影子,只不過我們在前面都沒有展開說,後文我們會選擇其中兩個來展開詳述。

image-20220417153748486

(二)總體結構

在前面的系列文章中,我們已經瞭解設計同步器時的四大核心要素。在AQS中,也仍然遵循這幾要素。因此雖然AQS看齊來很複雜,原始碼洋洋灑灑幾千行,但是如果我們分析它的資料結構和方法會發現,其核心就是StateQueue這兩個,其他都是圍繞它們倆展開。所以,我們理解AQS務必要抓住其核心所在,理解其核心和外圍,不要眉毛鬍子一把抓,更不要慌。

image-20220417160750212

接著上面的分析,我們來對AQS的原始碼和設計進行歸類和總結。為了更好的結構化理解AQS,我們可以將其拆解為下面的5層。很顯然,AQS中的方法眾多,因此我們將相關的方法進行歸類,這樣在理解時會有側重點。其中,綠色部分為需要重點關注的方法,波浪線打底的方法是需要子類實現的方法,而實線打底的方法則是AQS中提供的重要方法。

image-20220425112153483

1. 核心部件之狀態位

作為AQS的核心之一,同步狀態欄位state的重要性不言而喻。注意,state欄位有兩點需要注意:

  • voliatile修飾:不同執行緒在讀寫該欄位時,欄位的最新值對各執行緒可見;
  • int型別:採用int型別是比較巧妙的設計,用於表示當前同步的狀態情況。在AQS中,同步有獨佔和共享兩種模式。其中,在獨佔模式下,state的值為01即可;但是,在共享模式下,state的值則會大於1,比如某個具有超能力加成的大夫同時可以看10個病人,那麼state的值就為10

另外,還需要注意的是,state表示的是同步的狀態,而不是執行緒的狀態,執行緒的狀態在佇列的節點中,對此不要搞混淆

java /** * The synchronization state. */ private volatile int state;

2. 核心部件之同步佇列

AQS中有三個核心的欄位,除了上述的state之外,還有兩個分別是Node型別的headtail. 注意,AQS並沒有所謂的queue欄位,而是用headtail來表示佇列,有頭有尾,不就是佇列麼...那麼,headtail所構成的佇列是怎樣的?我們通過原始碼和圖示來說明。

在AQS的原始碼中,headtail均由transient volatile來修飾,這點和state是一致的,表示對其他執行緒可見。另外還需要注意的是:

  • 延遲初始化(lazily initialized)headtail均是在需要時才會初始化,而不是在AQS例項化即初始化。在後面我們會提到,並不是在所有情況下都需要佇列,在某些情況下AQS是不需要建立的,所以它們都是延遲初始化;
  • 由方法提供賦值入口:任何執行緒均不可以直接修改headtail的值,而必須通過AQS提供的方法入口來完成;

```java /* 佇列頭部 * 1. 延遲初始化; * 2. 經由方法提供賦值入口; * 3. 頭部節點狀態不可以為Canceled. / private transient volatile Node head;

/* 佇列尾部 * 1. 延遲初始化; * 2. 經由方法提供賦值入口; / private transient volatile Node tail; ```

AQS中由headtail構成的佇列圖示如下所示。圖中可以看到,stateheadtail構成了AQS中三個關鍵欄位。其中,headtail又是CHL佇列的關鍵欄位,佇列中的節點通過前驅和後繼的方式完成節點間關係的連線。

image-20220512180121207

3. 核心節點結構

作為AQS核心的資料結構之一,Node的組成如下圖所示,包含了waitStatusthreadprevnextnextWaiter幾個關鍵屬性。其中,waitStatus表示當前節點執行緒的狀態,thread表示當前執行緒,而prevnext分別代表節點的前驅和後繼。

image-20220417160809625

Node中的waitStatus是個重要且容易誤解的屬性,它有5個列舉值,為了方便理解,我們把可以這5個值以0為界分為三個區間來理解:

| 區間|狀態|說明| | --- |--- | --- | | waitStatus=0 | 初始化狀態 | 通過new Node()建立節點時,此時狀態為0.| | waitStatus>0 | 取消狀態 | 執行緒狀態無效,該執行緒被中斷或者等待超時,需要移除該執行緒節點。| | waitStatus<0 | 有效狀態 | 包括-1、-2、-3等值。其中,-1表示該執行緒處於同步佇列且可以被喚醒的狀態,-2表示節點在條件佇列中,-3用於共享模式,表示可以傳播到下個節點。|

另外,關於nextWaiter屬性,這裡要先補充說明的是AQS中其實有兩種型別的佇列:同步佇列和條件佇列。我們目前主要討論的都是同步佇列,條件佇列會在本文末尾討論,而nextWaiter主要用於條件佇列。在前面圖示的同步佇列中,佇列是雙向佇列,由prevnext表示當前節點的前驅和後繼,而條件佇列則是單向佇列,通過nextWaiter指向下一個節點,限於篇幅更多細節不在此處描述。

Node核心原始碼如下所示:

```java static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null;

/** 執行緒已經取消 */
static final int CANCELLED =  1;
/** 執行緒需要喚醒 */
static final int SIGNAL    = -1;
/** 執行緒條件等待中*/
static final int CONDITION = -2;
/**
 * 用於共享模式,表示可以傳播到下個節點
 */
static final int PROPAGATE = -3;

/** 節點狀態欄位 */
volatile int waitStatus;

/** 當前節點的前驅節點 */
volatile Node prev;

  /** 當前節點的後繼節點 */
volatile Node next;

/** 節點中的執行緒 */
volatile Thread thread;

/**
    用於條件佇列,指向下一個等待節點
 */
Node nextWaiter;

Node() { 
}

} ```

四、理解AQS的獨佔模式-以ReentrantLock為例

前面,我們講述了AQS的基本組成和和核心資料結構。在這部分內容中,我們以ReentrantLock為例來分析AQS的內部機制,通過具體的示例有助於我們理解抽象的AQS。

(一)基本用法

下面是前文所述的就診示例程式碼。在這段程式碼中,只有當患者獲得鎖之後才能進入診室就診,否則需要排隊等待,而就診結束後則需要將鎖釋放,讓其他等待的患者進入。示例程式碼只有幾行,可以說是相當精簡,非常容易理解。但是,其關鍵就在於lock()unlock()兩個方法中,這兩個方法則是由ReentrantLock和AQS協作完成。所以,接下來我們在分析原始碼時,將會分別分析這兩個部分。

```java / * 門診就診(就診+排隊)同步示例 */ public class OutpatientDemo { / * 當前就診資格 */ private final static ReentrantLock qualificationLock = new ReentrantLock();

public void 就診() {
    qualificationLock.lock();  // 如果門診沒有人,則獲取當前就診資格,直接就診。否則,將進入大廳佇列排隊等候。
    try {
        // ... 就診中
    } finally {
        qualificationLock.unlock(); // 就診結束離開後,釋放資格。
    }
}

} ```

(二)加鎖解析

1. ReentrantLock部分原始碼解析

下面是精簡後的ReentrantLock的lock()原始碼,我們移除了和lock無關的註釋和其他原始碼,以減少對理解的干擾。在原始碼中,我們可以看到一些關鍵資訊:

  • ReentrantLock實現了Lock介面;

  • ReentrantLock中的核心屬性是sync而Sync類是其內部類,並繼承了AQS

此外,我們還注意到ReentrantLock提供了公平非公平兩種模式 ,預設是非公平模式,所以前述示例程式碼使用的是非公平模式。那如何區別所謂的公平和非公平?從原始碼中,我們可以清晰地看到在非公平模式下,執行緒在請求鎖時並不是立即排隊,而是通過compareAndSetState嘗試加鎖,如果失敗再去排隊。這就像有人總愛找關係直接去找醫生,但是被拒絕之後又乖乖去排隊,沒有直接排隊就是不公平

注意,在這個過程中,執行緒嘗試加鎖是ReentrantLock中的原始碼而失敗後通過acquire排隊時則將進入AQS的原始碼

```java public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {        
    abstract void lock();
}

static final class NonfairSync extends Sync {
    final void lock() {
        if (compareAndSetState(0, 1)) // 自己先處理
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1); // 不行再呼叫AQS方法
    }
}

public ReentrantLock() {
    sync = new NonfairSync();
}

public void lock() {
    sync.lock();
}

} ```

以上就是ReentrantLock加鎖時的內部關鍵原始碼,比較簡單,主要是融合Lock介面AQS同步器。接下來,我們再順著acquire方法進入AQS的原始碼中一探究竟。

2. AQS部分原始碼解析

acquire是AQS資源搶佔的重要方法入口,但它原始碼也很簡短,只有三行。然而,通過這三行程式碼,我們可以看出其中的重要過程:

  • 排隊前不死心,通過tryAcquire嘗試再次搶佔鎖。雖然成功的概率有限,但是萬一成功就沒佇列和排隊什麼事了,豈不痛快?!
  • tryAcquire搶佔失敗後,乖乖去排隊;
  • 排隊時,先將當前執行緒通過addWaiter方法封裝成一個節點,再通過acquireQueued方法將自己加入到佇列中;
  • 如果搶佔失敗,排隊也失敗,則徹底死心,就地中斷自己

java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

以上這幾行便是acquire的核心要義,理解要義之後我們再來分析addWaiter等其中的細節。這裡再次提醒,通過acquire搶佔鎖時並不是總要排隊的,只有當搶佔失敗後才會排隊換句話說,並不是AQS中的所有場景都需要使用到佇列

通過addWaiter原始碼理解入隊邏輯

addWaiter的主要職責是根據指定的模式將當前執行緒封裝成節點並入隊。這裡的關鍵在於:

  • 封裝節點前要指定模式:獨佔或者共享。我們示例中使用的是獨佔模式(exclusive)

java private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 先嚐試快速入隊,如果失敗再通過enq走常規入隊模式 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

從寥寥無幾的原始碼中,我們可以看到入隊的核心方法是enq,但是addWaiter在呼叫enq之前,會先嚐試直接加入到隊尾的方式直接入隊,如果失敗再執行enq。那為什麼要這麼做呢?

原因是為了提高入隊效率。因為enq入隊使用的是for迴圈的方式,所以為了避免進入迴圈,那自然是能直接入隊最好。

比如,當趙子龍準備排隊時,趙子龍看到排在隊尾的是孫尚香,如果子龍走到隊伍時隊尾仍然是她,則子龍可以快速加入隊伍。但是,假如子龍走到隊伍時隊尾是妲己,妲己的前面才是孫尚香。很顯然,幾步路的功夫隊伍已經發生了變化。這個時候,如果子龍強行插隊排到孫尚香的後面,可能會享受來自妲己的三連魔法攻擊。那怎麼辦呢?

當然是遵守江湖規矩,按規矩排隊到隊尾,拒絕插隊。而這,便是enqacquireQueued的核心要義:堅持按規矩排到佇列的末尾,沒有佇列那就建立佇列

java private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 佇列不存在,需要初始化 // 思考:這裡為什麼要設定一個空的節點作為頭結點?稍後解釋 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

3. 佇列變化解析

關於enqacquireQueued入隊時的核心佇列變化,為了便於你的理解,我們製作了下面這幅圖,圖中的四個步驟反映的是佇列的變化過程,我們以執行緒t1入隊來分析這個過程:

  1. 初始狀態下,AQS佇列中的headtail指向的都是NULL;

  2. 構建首節點,這個節點是個空節點,佇列的頭部指向首節點;

  3. 佇列的尾部指向首節點;

  4. 構建執行緒t1節點,並將t1節點的前驅指向首節點,而尾部節點則指向t1節點。

image-20220509144244096

此處需要特別注意的是,在AQS同步佇列中有一個所謂的 首節點,在初始化佇列時會首先建立這個首節點。這個節點的存在非常有意思,因為它其實是個空節點(thread=null),但又極其容易被誤解卻又經常被問起

看到這裡,我們不禁要問既然是空節點,那它存在的意義是什麼?說起這個問題,我們還是要回到前述的就診排隊問題。

理解AQA同步佇列中的首節點【重要】

我們首先來思考一個問題:在排隊就診的佇列中,真的是所有人都在排隊嗎?如果是,那醫生在幹嘛呢?如果不是,那隊伍最前面的那個人是什麼狀態?那究竟是還是不是?當然不是,進入首節點意味著出隊

在排隊的就診佇列中,最前面的那個人當然不是在排隊,他在醫生那裡就診。對不對?所以,在AQS中,這個正在就診的人就是那個空的首節點,是他鎖定了資源,他的執行緒是正在執行中的,而不是等待中。這就是首節點,它不是排隊中的節點,這點非常容易誤解。

image-20220509153500427

接下來,我們從enqacquireQueued的原始碼中理解這點。在enq原始碼中,我們可以看到到當佇列不存在(t==null)時,會執行compareAndSetHead(new Node())來設定空的首節點。

java private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node()))// 設定空的首節點 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 而在acquireQueued入隊時,會執行tryAcquire(arg)來嘗試搶佔資源,如果成功則會執行setHead(node)將自己設定為首節點。

java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 設定當前節點搶佔資源成功,設定為首節點 setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 那麼,這個setHead(node)做了什麼呢?下面的原始碼已經寫得清清楚楚。原始碼只有三行,但是我們要特別注意的是,不要認為setHead(node)意味著只是排到了佇列的頭部,它其實意味著出隊、出隊和出隊所以,這個方法通常是由acquire成功後呼叫

而一旦搶佔資源成功後,則不需要再排隊,所以在當前節點變為首節點後,會通過第二行的node.thread = null將其執行緒置為空。如此,當該執行緒執行結束後,當前節點不再有其他引用,可以輔助垃圾回收

java private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }

4. 總體流程圖示

以上就是ReentrantLock中關於加鎖的整體過程。在這個過程中,由ReentrantLock和AQS兩部分程式碼共同完成。理解加鎖的過程,重點在於理解其中的核心思想和步驟,比如哪些是由ReentrantLock完成的、哪些是AQS完成的、佇列是如何設計的、首節點的意義是什麼等等。

為了方便你理解加鎖的過程,我們製作了下面這幅圖,圖中展示了一些過程中的一些關鍵步驟。

image-20220417160728078

(三)鎖的釋放解析

在上部分內容中,我們通過ReentrantLock分析了AQS的加鎖過程,在這部分我們仍然結合兩者再來探索AQS的解鎖過程,即當我們執行qualificationLock.unlock()時發生了什麼。

1. ReentrantLock部分原始碼解析

ReentrantLock中的解鎖入口是其unlock方法,在這個方法中我們可以看到ReentrantLock本身沒有其他的處理邏輯,而是直接呼叫了AQS的release方法。但是,AQS的release會呼叫tryRelease,而tryRelease的實現則是由ReentrantLock實現,所以ReentrantLock原始碼中我們要重點關注的只有tryRelease這個方法。

tryRelease在實現上並不複雜,關鍵點在於當狀態為0時則視為釋放成功,而且非當前搶佔執行緒不允許釋放。

```java public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync;

  abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 釋放成功
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

public void unlock() {
    sync.release(1);
}

} ```

2. AQS部分原始碼解析

AQS原始碼部分的重點則在於release方法,它主要呼叫子類的tryRelease,當子類判定釋放成功後,則喚醒後繼節點執行緒。

java public final boolean release(int arg) { if (tryRelease(arg)) { // 呼叫子類實現 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 喚醒後繼節點 return true; } return false; } unparkSuccessor方法主要用於喚醒後繼節點。需要注意的是,在喚醒過程中可能有的節點已經取消或者為null,那麼這個時候會依次向後尋找有效的節點來喚醒

```java private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);

Node s = node.next; // 找到待喚醒的後繼節點
if (s == null || s.waitStatus > 0) { 
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev) // 後繼節點可能為null或狀態錯誤,則迴圈向後查詢
        if (t.waitStatus <= 0)
            s = t;
}
if (s != null) // 找到有效的後繼節點,喚醒它
    LockSupport.unpark(s.thread);

} ```

3. 總體流程圖示

下面是ReentrantLock釋放鎖時的流程圖,相對於加鎖而言,鎖的釋放要簡單些。流程中的重點在於要區分哪些是ReentrantLock完成的,哪些是AQS完成的,以及如何喚醒後繼節點

image-20220418120751848

五、理解AQS的共享模式-以Semaphore為例

現在,我們已經知道,AQS中的核心屬性之一state是個int型別的變數,並且AQS的同步狀態支援獨佔模式和共享模式,而在ReentrantLock中我們已經理解AQS在獨佔模式下的工作原理,所以在這部分我們將藉助Semaphore來理解AQS中的共享模式。(如果你對Semaphore不甚瞭解,可以查閱王者併發課系列的相關專題文章)

(一)基本用法

通常,我們在醫院就診時往往是一個診室同時僅問診一個患者,所以我們可以通過ReentrantLock來模擬這一場景。但是,假如門診裡有兩名醫生,可以同時問診兩名患者。那麼,這種場景下我們就可以使用Semaphore來模擬,而Semaphore的背後正是AQS的共享模式。相關示例程式碼如下所示,診室同時允許兩名患者進入。

```java / * 門診就診(就診+排隊)同步示例 */ public class OutpatientDemo { / * 當前就診資格,允許2人同時就診 */ private final static Semaphore permits = new Semaphore(2);

public void 就診() {
    permits.acquire();  // 如果當前有可用就診資格,則獲取當前就診資格,直接就診。否則,將進入大廳佇列排隊等候。
    try {
        // ... 就診中
    } finally {
        permits.release(); // 就診結束離開後,釋放資格。
    }
}

} ```

那麼,當代碼執行permits.acquire()時發生了什麼?我們接著往下看。

(二)資源搶佔解析

Semaphore在執行permits.acquire()時分兩部分完成,一部分在Semaphore中完成,一部分則由AQS完成。

1. Semaphore部分原始碼解析

以下是Semaphore中關於acquire()的核心原始碼,為了減少其他程式碼對你的影響,我們已經刪除了不必要的註釋和其他程式碼,僅保留和acquire()相關的程式碼。

在程式碼中,我們可以看到Semaphore支援公平和非公平兩種模式,在ReentrantLock的示例中我們使用的非公平模式,而在此我們將示例Semaphore的公平模式,但其實兩者差別並不大。關於下面的原始碼部分,有幾個關鍵點需要我們注意:

  • 預設情況下Semaphore使用的是非公平模式;
  • permits.acquire()執行時,呼叫的是AQS中的sync.acquireSharedInterruptibly(1)方法,而這個方法又會反過來呼叫Semaphore中的tryAcquireShared方法;
  • 原始碼中acquire()acquire(1)tryAcquire()acquireUninterruptibly()等都只不過是中斷、超時等不同的變種,其核心思路不變,不要有“亂花漸欲迷人眼”的錯覺。

```java public class Semaphore implements java.io.Serializable { private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
        setState(permits);
    }
}

static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }
            // 公平模式下獲取共享資格
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

  // 構建訊號量,並初始化資格總數
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
    // 獲取資格,獲取失敗後等待
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
    // 釋放資格
public void release() {
    sync.releaseShared(1);
}

} ```

2. AQS部分原始碼解析

AQS在處理共享資源申請時,也有很多不同的變種方法,比如中斷、超時等,但其核心思路一致,所以這裡使用acquireSharedInterruptibly來講解,這個方法也是Semaphore呼叫的方法。

acquireSharedInterruptibly方法的原始碼如下所示,主要表達了兩層含義: * 如果當前執行緒中斷,則丟擲異常; * 如果Semaphore中的tryAcquireShared返回的結果小於0,則意味著可以繼續,將進入AQS佇列處理邏輯。

java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 存在可用資源,進入共享獲取邏輯 doAcquireSharedInterruptibly(arg); } AQS共享模式下的佇列處理邏輯和獨佔模式下的處理邏輯總體相似,其核心差異在於addWaiter(Node.SHARED)setHeadAndPropagate(node, r)兩行程式碼,前者標記了當前節點為共享模式,而後者則在將自己設定為頭部後,同時喚醒後繼節點。那麼,喚醒後繼節點是什麼意思?

java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 新節點設定為共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 呼叫子類的方法 int r = tryAcquireShared(arg); if (r >= 0) { // 設定頭部和傳播狀態 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

這是因為,在AQS的獨佔模式中,資源state同時僅允許一個執行緒搶佔,所以除了搶佔成功的節點,其他節點均處理等待狀態。但是,在AQS的共享模式中,雖然當前執行緒搶佔了資源,但它搶佔的僅是部分資源,還可能有剩餘資源可被其他執行緒搶佔,所以它要通過setHeadAndPropagate(node, r)喚醒其他節點執行緒

java private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果首節點狀態是SIGNAL,則需要unpark喚醒下個節點執行緒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 迴圈檢查,多次確認 // 喚醒首節點下個節點執行緒 unparkSuccessor(h); } // ws==0時,將會嘗試將ws設定為Node.PROPAGATE,這樣setHeadAndPropagate讀到ws<0時,就會喚醒後繼節點 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // 迴圈檢查,防止頭部發生變化 break; } }

3. 總體流程圖示

image-20220512180047272

(三)資源釋放解析

1. Semaphore部分原始碼解析

Semaphore通過release()執行資源釋放,如下原始碼所示。release()直接呼叫AQS中的releaseShared(),當然這並不是說釋放資源都是AQS的事而與Semaphore無關,因為releaseShared()中會呼叫Semaphore中的tryReleaseShared()

```java public class Semaphore implements java.io.Serializable { private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
        setState(permits);
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // 如果傳入的releases數值為負,則丟擲異常
                throw new Error("Maximum permit count exceeded");
            // 更新最新值
            if (compareAndSetState(current, next))
                return true;
        }
    }
}

public void release() {
    sync.releaseShared(1);
}

} ```

2. AQS部分原始碼解析

下面是releaseShared()方法的原始碼,可以看到它只做了兩件事:一是呼叫模板方法tryReleaseShared(),二是呼叫doReleaseShared(),前者由子類Semaphore提供,後者由AQS提供。

java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }

doReleaseShared()方法是AQS共享模式下的關鍵方法,其實它在前面的setHeadAndPropagate()方法中也有引用,其關鍵原始碼如下。

java private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果首節點狀態是SIGNAL,則需要unpark喚醒下個節點執行緒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 迴圈檢查,多次確認 // 喚醒首節點下個節點執行緒 unparkSuccessor(h); } // ws==0時,將會嘗試將ws設定為Node.PROPAGATE,這樣setHeadAndPropagate讀到ws<0時,就會喚醒後繼節點 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // 迴圈檢查,防止頭部發生變化 break; } }

3. 總體流程圖示

image-20220510180435980

六、理解AQS中的條件同列

前述內容所講的都是AQS中的同步佇列,接下來我們再來探討AQS中的條件佇列。為了便於理解,我們仍然以醫院就診為例。

在醫院候診排隊時,正常情況下按照隊伍排隊即可。然而,凡事都有例外,比如防疫規定沒有做核酸的不能到大廳排隊,要現在外面做核酸並在結果出來後才能排隊。此時,就會出現兩個佇列:一是按先來後到的正常候診佇列,二是核酸結果等待佇列

image-20220519164637626

這樣的醫院排隊機制相信你可以理解。其實,這種機制不僅在現實中存在,在軟體中也存在。作為強大的同步器,AQS所提供的條件佇列正是為了解決此類問題。我們將可以將正常候診的佇列稱為同步佇列,而需要等待核酸結果出來後才能進入排隊的稱為條件佇列

所以,基於目前的理解,我們可以進一步完善AQS的整體結構:同步狀態+同步佇列+條件佇列,如下圖所示。

AQS的條件佇列是單向佇列,由ConditionObject+Node組成,ConditionObject中定義了佇列的頭部(firstWaiter)尾部(lastWaiter),並且頭部和尾部都是Node型別。和AQS中的同步佇列類似,條件佇列也有著廣泛的使用。

image-20220512180900730

比如,ReentrantLock中的newCondition()方法所呼叫的就是AQS中的方法,而ArrayBlockingQueue則使用了ReentrantLock這一方法,相關使用如下所示。

  • ReentrantLock中的方法

java public Condition newCondition() { return sync.newCondition(); // 建立新的等待條件 }

  • ReentrantLock所呼叫的AQS方法: JAVA final ConditionObject newCondition() { return new ConditionObject(); // 構建等待佇列 }
  • ArrayBlockingQueue中所使用的ReentrantLock中的方法,比如lock.newCondition(): java public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); // 等待佇列的具體場景應用 notFull = lock.newCondition(); }

  • ArrayBlockingQueue中通過notFull.await()將當前執行緒放入條件佇列,等待喚醒。 java public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); // 條件佇列中的等待 enqueue(e); } finally { lock.unlock(); } }

需要注意的是,同一執行緒不可以同時處於同步佇列和條件佇列中當執行緒進入條件佇列時,將自動從原來的同步佇列中出隊。限於篇幅,關於條件佇列的更多原始碼和流程在此不做更多解釋,讀者可以自行探索。

小結

正文到此結束,恭喜你又上了一顆星✨

在本文中,我們首先釐清了同步的概念,無論是現實生活還是軟體設計,同步都是廣泛的存在,而從生活中理解軟體中的設計相對較為容易。對於同步問題的解決,佇列是常被採用的方案。但是在軟體設計中,佇列的設計需要考慮到公平性、效能和擴充套件性等多個維度,所以雖然佇列是AQS的核心元件之一,但是對CLH佇列進行了適當的改造,以更好地適配AQS的設計理念和需求。因此,理解AQS的核心在於理解變種的CLH佇列,包括它的設計理念、資料結構組成,以及出隊和入隊等完整過程,所以我們在開篇引入並介紹了CLH佇列。

在本文的第四和第五部分,我們以ReentrantLock和Semaphore為例,介紹了AQS獨佔模式和共享模式下的入隊和佇列的變化形態,重點還是在於幫助理解CLH佇列。而在第六部分,我們介紹了似乎鮮為人知但同樣重要的條件佇列。

本文整體篇幅較長,內容較多。然而,在理解AQS時,我們不要深陷冗長的文章和原始碼中。首先要清楚的並非AQS是什麼和它的工作原理,而是要先搞清楚AQS所解決的是什麼問題,針對問題AQS提出了怎樣的方案。如此,才能抓住AQS的核心脈絡,理解它的本質

另外,作為成熟的同步器,AQS提供了完善的各種同步機制,JDK中也提供了多樣的同步實現,比如ReentrantLock、Semaphore和CountDownLatch等。因此,在編碼中需要使用同步機制時,應首先考慮現有的穩定的同步方案,其次再考慮自由地自主實現

夫子的試煉

  • 基於AQS,設計自己的同步器:實現一個佇列,三個視窗同時核酸取樣。

延伸閱讀與參考資料

  • http://tutorials.jenkov.com/java-concurrency/anatomy-of-a-synchronizer.html
  • https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
  • https://blog.51cto.com/u_12302616/3230929
  • https://www.cnblogs.com/xijiu/p/14396061.html
  • https://www.javarticles.com/2012/10/abstractqueuedsynchronizer-aqs.html
  • https://www.infoq.cn/article/bvpvyvxjkm8zstspti0l
  • https://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/CLH.pdf

常見面試題

  • 說說自己對 AQS 的理解?
  • 多個執行緒通過鎖請求共享資源,獲取不到鎖的執行緒怎麼辦?
  • 獨佔模式和共享模式有哪些區別?
  • 同步佇列中,執行緒入、出同步佇列的時機和過程?
  • 為什麼 AQS 有了同步佇列之後,還需要條件佇列?
  • 如果一個執行緒需要等待一組執行緒全部執行完之後再繼續執行,有什麼好的辦法麼?是如何實現的?

關於作者

專注高併發領域創作。人氣專欄《王者併發課》、小冊《高併發秒殺的設計精要與實現》作者,關注公眾號【MetaThoughts】,及時獲取文章更新和文稿。


如果本文對你有幫助,歡迎點贊關注監督,我們一起從青銅到王者