多圖詳解AQS

語言: CN / TW / HK

AQS 的全稱是 AbstractQueuedSynchronizer ,從字面理解它就是抽象的佇列同步器。它是可重入鎖、各種同步工具類(共享鎖)和條件等待喚醒機制實現的基石。

AQS 有一個重要的屬性 state,它的值直接關係著其他執行緒能否獲取到鎖。

如果我們看過可重入鎖、各種同步工具類(共享鎖)的原始碼,會發現這些鎖的關注點都在於通過 AQS 的 state 值或者能否通過 CAS 修改 state 的值來判斷當前執行緒能否獲取到鎖(這裡判斷是否能獲取到鎖都是靠 AQS 的子類 Sync 和 Sync的子類實現的,而這些子類的具體方法是鎖自己去實現的——歸根到底這部分就是鎖來實現的)。如果獲取鎖成功,直接扣減 AQS 的 State 值,不會涉及到 AQS。但如果當前執行緒獲取鎖失敗,那麼剩下的包括阻塞喚醒執行緒、重新發起獲取鎖之類的操作全都都會扔給 AQS 。簡單來說就是 AQS 包攬了同步機制的各種工作。這就是為什麼理解了 AQS 再去理解各種鎖就會非常容易,它的重要性也就不言而喻了。

下圖就是執行緒獲取鎖的大致流程:

下面就是使用 AQS 實現的最簡單的獨佔鎖,從程式碼也可以看出 AQS 大大降低了開發鎖的難度:

class Mutex {
    
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease(int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.tryAcquire(1);
    }

    public void unlock() {
        sync.tryRelease(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

}

AQS 的在 ReentrantLock 中的應用

各種鎖對於 AQS 的使用方式大致相同,這裡以 ReentrantLock 為例來講解。

ReentrantLock 的實現比上面的例子會複雜一點,但大體思路是相同的。

ReentrantLock 並不是直接繼承自 AQS,它實現了 Lock 的介面並對外提供鎖的各種功能。它通過內部的 AQS 子類 Sync 來使用 AQS 的功能,這樣設計的好處在於鎖的功能和同步器的功能劃分更清晰,方便擴充套件和維護。

由於 ReentrantLock 支援公平鎖和非公平鎖, Sync 又有兩個分別實現了公平鎖和非公平鎖功能的子類 FairSyncNonfairSync 。繼承關係如下圖所示:

這裡著重講一下 ReentrantLock 的 lock()unlock 方法。 lock() 方法內部就一句程式碼``sync.lock(); 實際是呼叫的 **FairSync** 或 **NonfairSync** 的 lock() 方法,而 unlock 方法是直接通過 **Sync** 類來呼叫 AQS 的 release()`方法。

public void lock() {
	sync.lock(); // 實際是呼叫的 FairSync 或 NonfairSync 的`lock()`方法
}

public void unlock() {
	sync.release(1); // AQS的方法
}

下面再看看 FairSyncNonfairSynclock() 方法的具體實現:

FairSync

final void lock() {
	acquire(1); // AQS的方法
}

NonfairSync

final void lock() {
	if (compareAndSetState(0, 1)) // 嘗試直接獲取鎖
		setExclusiveOwnerThread(Thread.currentThread()); // 獲取鎖成功後設置當前執行緒為獨佔執行緒
	else
		acquire(1); // AQS的方法
}

從上面的程式碼可以看出,加鎖解鎖其實本質都是去呼叫 AQS 的 acquire()release() 方法。這兩個方法在後面會詳細講解。

SyncSync 的子類裡還有兩個重要的方法: tryAcquire()tryRelease() ,它們都是AQS 為獨佔鎖提供的勾子方法,分別代表嘗試獲取鎖和嘗試釋放鎖。其中 tryRelease() 是由 Sync 來實現的, tryAcquire() 是由 Sync 的子類來實現的。這點其實也比較好理解,ReentrantLock 支援公平鎖和非公平鎖,這兩種鎖的差異就體現的嘗試獲取鎖這裡,而釋放鎖的邏輯是一致的。由於 ReentrantLock 不是本文的重點,這兩個方法就不詳細說了。

ConditionObject

條件等待和條件喚醒功能一般都是 ReentrantLock 與 AQS 的內部類 ConditionObject 配合實現的。一個 ReentrantLock 可以建立多個 ConditionObject 例項,每個例項對應一個條件佇列,以保證每個例項都有自己的等待喚醒邏輯,不會相互影響。

條件佇列裡的執行緒對應的節點被喚醒時會被放到 ReentrantLock 的同步佇列裡,讓同步佇列去完成喚醒和重新嘗試獲取鎖的工作。可以理解為條件佇列是依賴同步佇列的,它們協同才能完成條件等待和條件喚醒功能。

AQS 的構成

講完應用,下面講講AQS 的構成。

AQS 的繼承關係如下圖所示:

從圖中可以看出 AQS 繼承了另外一個抽象類 AbstractOwnableSynchronizer,這個類的功能其實就是持有一個不能被序列化的屬性 exclusiveOwnerThread ,它代表獨佔執行緒。在屬性中記錄持有獨佔鎖的執行緒的目的就是為了實現可重入功能,當下一次獲取這個鎖的執行緒與當前持有鎖的執行緒相同時,就可以獲取到鎖,同時 AQS 的 state 值會加1。

圖中紅線部分標識了 AQS 的兩個內部類,一個是 Node, 一個是 ConditionObject。

Node

Node就是 AQS 實現各種佇列的基本組成單元。它有以下幾個屬性:

  • waitStatus: 代表節點狀態:CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0(初始狀態)
  • prev: 代表同步佇列的上一個節點
  • next: 代表同步佇列的下一個節點
  • thread: 節點對應的執行緒
  • nextWaiter: 在同步佇列裡用來標識節點是獨佔鎖節點還是共享鎖節點,在條件佇列裡代表條件條件佇列的下一個節點

佇列

AQS 總共有兩種佇列,一種是同步佇列,代表的是正常獲取鎖釋放鎖的佇列,一種是條件佇列,代表的是每個 ConditionObject 對應的佇列,這兩種佇列都是 FIFO 佇列,也就是先進先出佇列。

同步佇列

而同步佇列的節點分為兩種,一種是獨佔鎖的節點,一種是共享鎖的節點,它們唯一的區別就是 nextWaiter 這個指標的值。如果是獨佔鎖的節點,nextWaiter 的值是 null,如果是共享鎖的節點,nextWaiter 會指向一個靜態變數 SHARED 節點。獨佔鎖佇列和共享鎖佇列如下圖所示:

條件佇列

條件佇列是單鏈,它沒有空的頭節點,每個節點都有對應的執行緒。條件佇列頭節點和尾節點的指標分別是 firstWaiter 和 lastWaiter ,如下圖所示:

waiteStatus

講完佇列我們來著重看一下節點的這個屬性,它代表著節點的等待狀態。這個屬性非常重要,如果不理解它,後面 AQS 原始碼部分也會很難理解。

首先看一下 waiteStatus 的取值:

  • CANCELLED 取消狀態,值為1
  • 0 初始狀態
  • SIGNAL 通知狀態,值為 -1
  • CONDITION 條件等待狀態,值為 -2
  • PROPAGATE 傳播狀態,值為 -3

這些狀態我們一個一個看。

CANCELLED代表著取消狀態,它的值是 1,注意這些狀態裡只有它的值是大於 0 的,所以原始碼裡判斷是取消狀態是直接通過 waiteStatus 值是否大於 0 來判斷。

如果 waiterStatus 的值為 0,有兩種情況:1、節點狀態值沒有被更新過(同步佇列裡最後一個節點的狀態);2、在喚醒執行緒之前頭節點狀態會被被修改為 0。

SIGNAL代表著通知狀態,這個狀態下的節點如果被喚醒,就有義務去喚醒它的後繼節點。這也就是為什麼一個節點的執行緒阻塞之前必須保證前一個節點是 SIGNAL 狀態。

CONDITION代表條件等待狀態,條件等待佇列裡每一個節點都是這個狀態,它的節點被移到同步佇列之後狀態會修改為 0。

PROPAGATE傳播狀態,在一些地方用於修復 bug 和提高效能,減少不必要的迴圈。

park() 和 unpark()

講 AQS 原始碼之前有一個重要的概念需要理解一下,那就是 Unsafe 這個類的 park()unpark() 方法。

注意在 AQS 裡並沒有直接呼叫 Unsafe 的這兩個方法,而是通過 LockSupport 間接呼叫的 Unsafe 的這兩個方法,LockSupport 裡面封裝了一些引數來簡化呼叫過程。

Unsafe 的這兩個方法其實就是對許可的管理, park() 方法是讓執行緒去獲取一個許可,如果獲取失敗就阻塞當前執行緒, unpark() 方法是釋放一個許可,如果當前執行緒是阻塞的,會喚醒當前執行緒。

這個許可簡單理解就像一個人要去過一個城關,規定有令牌才能過,沒有令牌就得等著。 park() 方法就是城關的守衛來檢查你的令牌,你能拿出來就過去了,沒拿出來就得原地等著。 unpark() 方法就是有人送來一塊令牌,如果發現你已經有了,就不送了,如果發現你正好沒有,就送給你。而如果這個時候你剛好等在城門口,那麼你順手就把剛得到的令牌給守衛,守衛就放你過去了。

AQS 原始碼分析

AQS 原始碼分析分為三部分:獨佔鎖部分、共享鎖部分和條件等待條件通知部分。

獨佔鎖部分

acquire()

首先是 acquire() 方法,下面是原始碼:

public final void acquire(int arg) {
	if (!tryAcquire(arg) &&
			acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 這裡 Node.EXCLUSIVE 的值是 null
			selfInterrupt();
}

這裡 tryAcquire(arg) 方法是由具體的鎖來實現的,這個方法主要是嘗試獲取鎖,獲取成功就不會再執行其他程式碼了,這個方法結束。獲取失敗會進入下一步 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 。這裡有個方法巢狀,我們先看 addWaiter(Node.EXCLUSIVE) 方法:

private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	// Try the fast path of enq; backup to full enq on failure
	Node pred = tail;
	if (pred != null) { // 如果尾節點不為空,就把節點放在尾節點後面並設定為新的尾節點
		node.prev = pred;
		if (compareAndSetTail(pred, node)) { // 嘗試把節點設定為新的尾節點
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}

這個方法前半段比較好理解,先建立一個節點,如果有尾節點,就讓這個節點指向當前的尾節點,並把它設定成新的尾節點,設定失敗也沒關係,後面會進入一個重要方法 enq() 。如果當前沒有尾節點,會直接進入到 enq() 方法。下面是 enq() 的原始碼:

private Node enq(final Node node) {
	for (;;) {
		Node t = tail;
		if (t == null) { // 如果尾節點為空,那麼佇列也為空,新建一個頭節點,讓 head 和 tail 都指向它
			if (compareAndSetHead(new Node()))
				tail = head;
		} else { // 如果有尾節點,把傳入的節點放入隊尾
			node.prev = t;
			if (compareAndSetTail(t, node)) {
				t.next = node;
				return t;
			}
		}
	}
}

其實這個方法就做兩件事:

1、判斷如果沒有尾節點,那麼佇列肯定是空的,也不會有頭節點,這個時候就要去新增一個空節點,通過 CAS 將這個空節點設定成頭節點,然後 tail 指標也指向這個空節點。

2、如果有尾節點,就把當前節點放在尾節點後面,然後通過 CAS 嘗試將 tail 指標指向這個節點,直到成功為止。

下圖展示了初始佇列為空時節點的變化,佇列不為空的情況也類似於下圖單節點到雙節點的情況,都是在尾節點後續追加節點。

看完了 addWaiter() 方法,接下來就是另一個非常重要的方法 acquireQueued() ,原始碼如下:

final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor(); // 核驗並獲取前一個節點,如果前一個節點不存在,直接拋異常
			if (p == head && tryAcquire(arg)) { // 如果前一個節點就是頭節點,讓這個節點的執行緒嘗試獲取鎖
				setHead(node); //獲取鎖成功後把當前節點設定為頭節點
				p.next = null; // 將之前頭節點的 next 指標置空,後面 GC 時會回收這個節點
				failed = false;
				return interrupted;	
      }
			if (shouldParkAfterFailedAcquire(p, node) && // 判斷是否應該阻塞當前執行緒(核心是判斷並修正前面節點的 waitStatus)
				parkAndCheckInterrupt()) // 阻塞當前執行緒、返回並清除中斷標記
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

這個方法也分兩種情況:

1、當前活躍的執行緒對應的節點就是同步佇列的第二個節點,那麼就讓當前執行緒去嘗試獲取鎖,直到成功為止,如果獲取鎖成功就把這個節點設定成頭節點;

2、當前活躍執行緒是第三個或者更後面的節點,那麼就會進入判斷是否需要阻塞並進而阻塞的邏輯。

出現第一種情況有兩種可能:

1、這個節點剛入佇列而且這個佇列只有頭節點和這個節點;

2、本來這個節點是排在後面的,前面的節點一個個被喚醒之後它的位置也不斷往前移,最終它作為第二個節點也被喚醒了。

整個方法的大致流程如下圖所示:

下面重點講一下第二種情況,也就是涉及阻塞執行緒的情況。

先看看 shouldParkAfterFailedAcquire() 原始碼:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL) // 判斷前面節點狀態為 SIGNAL ,返回 true
		return true;
  if (ws > 0) { // 如果前面的節點狀態為取消(CANCEL值為1),就一直向前查詢,直到找到狀態不為取消的節點,把它放在這個節點後面
  	do {
    	node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    	pred.next = node;
  } else {
  	compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 如果前面節點不是取消也不是 SIGNAL 狀態,將其設定為 SIGNAL 狀態
  }
  return false;
}

這個方法主要是判斷前一個節點的狀態:

1、如果是 SIGNAL 就返回 true 表示可以阻塞當前執行緒;

2、如果前面節點狀態大於零,也就是取消狀態,那麼一直往前移直到它前面的節點不是取消狀態;

3、如果不是前兩種狀態,那麼把前一個節點狀態設定成 SIGNAL 。

除了第一種狀態,後面兩種狀態都會返回 false,後面經過迴圈再次進去這個方法。

當前面一個方法返回 true 時,就會進入下一個判斷,也就是 parkAndCheckInterrupt() 方法,下面是方法原始碼:

private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this); // 阻塞當前執行緒
  return Thread.interrupted(); // 返回並清除當前執行緒中斷狀態
}

這個方法比較簡單,就做兩件事:阻塞當前執行緒和返回並清除中斷狀態。

上面就是 acquire() 部分的原始碼,接著講與它對應的另一個方法 release()

release()

下面是 release() 的原始碼:

public final boolean release(int arg) {
	if (tryRelease(arg)) { // 嘗試釋放鎖,如果成功則喚醒後繼節點的執行緒
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h); // 喚醒後面節點中第一個非取消狀態節點的執行緒
		return true;
	}
	return false;
}

在方法的開始做了一個判斷:

1、如果當前執行緒釋放鎖失敗,就直接返回了;

2、如果釋放鎖成功,那麼就會接著判斷頭節點是否為空和頭節點 waitStatus 是否不為 0 。

這裡判斷頭節點狀態是一個比較重要的點。為什麼頭節點的狀態一定不能為 0 呢?從後面要講到原始碼可以知道,在喚醒頭節點的後繼之前會做一個將頭節點狀態置為 0 的操作(雖然這個操作不一定成功)。如果頭節點的狀態為 0 了,說明正在釋放後繼節點,這時候也就不再需要釋放了,直接返回 true。

頭節點狀態判斷之後,就會進入到釋放後繼節點這一步,也就是 unparkSuccessor() 方法:

private void unparkSuccessor(Node node) {
	int ws = node.waitStatus; // 這裡取的是頭節點的狀態
  if (ws < 0)
  	compareAndSetWaitStatus(node, ws, 0); // 嘗試將頭節點狀態設定為 0,不保證能設定成功
  Node s = node.next;
  if (s == null || s.waitStatus > 0) { // 如果後面這個節點狀態為取消,那麼就找到一個位置最靠前的非取消狀態的節點
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    	if (t.waitStatus <= 0)
      	s = t;
  }
  if (s != null)
  	LockSupport.unpark(s.thread); // 喚醒符合條件的後繼節點
}

這個方法的核心目標是喚醒頭節點符合條件的後繼節點。因此前面做了一個判斷,如果後面這個節點不是取消狀態也不為空,那麼就直接喚醒它。如果後面的節點不符合要求,那麼就開始從後往前遍歷,找到一個最靠前的並且是非取消狀態的非空節點,然後喚醒它對應的執行緒。

整個方法的流程如下圖所示:

共享鎖部分

acquireShared()

瞭解完獨佔鎖的加鎖和解鎖的邏輯,接著來講講共享鎖的加鎖和解鎖邏輯。

下面是 acquireShared() 方法的原始碼:

public final void acquireShared(int arg) {
	if (tryAcquireShared(arg) < 0)
		doAcquireShared(arg);
}

這個方法就兩行:第一行判斷嘗試獲取鎖的返回值是否小於0,這裡的返回值是指當前訊號量減去傳入的訊號量的結果,小於0就代表當前訊號量不足,獲取鎖失敗,這時候就需要 AQS 接管了;第二行是執行阻塞和喚醒後獲取鎖的方法。

下面是 doAcquireShared() 方法的原始碼:

private void doAcquireShared(int arg) {
	final Node node = addWaiter(Node.SHARED); // 1、共享節點入隊
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor();
			if (p == head) {
        int r = tryAcquireShared(arg); // 2、嘗試獲取共享鎖(相當於嘗試扣減訊號量)
        if (r >= 0) {
          setHeadAndPropagate(node, r); // 3、設定頭節點並且做一些判斷,符合條件會喚醒下一個節點
          p.next = null; // help GC
          if (interrupted)
            selfInterrupt();
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node)
        parkAndCheckInterrupt()) // 執行緒會阻塞在這個位置,被喚醒後再繼續迴圈
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

這個方法大部分程式碼與前面講的 acquireQueued() 方法是相同的。這裡著重講不同的地方。

首先是標記的第 1 處 final Node node = addWaiter(Node.SHARED); 這裡引數是傳的靜態常量 SHARED ,這個值會賦給新生成節點的 nextWaiter 。正如前面說的,通過 nextWaiter 的值我們就能判斷這個節點是獨佔鎖的節點還是共享鎖的節點。

然後是標記為 2 的這行程式碼 int r = tryAcquireShared(arg); 這代表嘗試獲取鎖之後的值,如果剩下的訊號量不為負,那就代表獲取鎖成功了,就會進入到標識為 3 的這個方法。

下面我們來看看標記為 3 的 setHeadAndPropagate(node, r) 方法的原始碼:

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; // Record old head for check below
  setHead(node); // 1
  if (propagate > 0 || h == null || h.waitStatus < 0 ||  // 2
      (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
      doReleaseShared();
  }
}

這個方法程式碼也不多,主要是兩塊內容:第一個是 setHead(node) 方法,這個方法讓第二個節點變成頭節點,置空之前頭節點的部分指標;第二塊內容做了大量的判斷,然後如果符合條件會執行 doReleaseShared(); ,這個方法也是後面重點要講的喚醒共享鎖同步佇列執行緒的方法。

這裡詳細講一下第二塊內容做的這些判斷:

propagate > 0
h == null
h.waitStatus < 0
(h = head) == null
h.waitStatus < 0
s == null
s.isShared()

這個方法裡實現了鏈式喚醒:當一個執行緒被喚醒並獲取到鎖,如果滿足條件就會去喚醒其他執行緒來嘗試獲取鎖,這種喚醒能一直傳遞下去,使得共享鎖獲取鎖的效率大大提升。

releaseShared()

接著講另一個重要的方法 releaseShared() ,下面是原始碼:

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}

這個方法除了返回值,核心程式碼也只有兩行:第一行代表嘗試釋放鎖,釋放失敗就直接返回了,釋放成功就會執行喚醒後繼節點執行緒操作;第二行就是具體的喚醒執行緒的方法;

下面是 doReleaseShared() 方法的原始碼:

private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;            // loop to recheck cases
        unparkSuccessor(h);
      }
      else if (ws == 0 &&
               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;                // loop on failed CAS
    }
    if (h == head)                   // loop if head changed
      break;
  }
}

在這個方法的迴圈裡對頭節點做了大量的判斷,頭節點的狀態滿足條件才會執行喚醒操作,我們挨個來看看這些判斷的作用:

ws == Node.SIGNAL
!compareAndSetWaitStatus(h, Node.SIGNAL, 0)
ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)

unparkSuccessor() 方法前面已經講過,這裡就不重複講了。

條件等待和條件通知

條件等待和條件通知功能主要由 AQS 內部類 ConditionObject 的兩個重要的方法: await()signal() 來實現。

await()

await() 方法正如字面意思一樣,就是等待。它與 Object 物件的 wait() 方法不同的是,Object 的 wait() 方法呼叫後,任何物件呼叫 notify 都能喚醒它,而 await() 方法呼叫後,只有呼叫 await() 方法的例項呼叫的 notify() 方法才能喚醒它,因此 await() 是一個條件等待方法。

方法的原始碼如下:

public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  Node node = addConditionWaiter(); // 生成一個新的節點新增到條件佇列
  int savedState = fullyRelease(node); // 呼叫同步佇列釋放鎖的方法
  int interruptMode = 0;
  while (!isOnSyncQueue(node)) { // 節點是否已經轉移到了同步佇列中
    LockSupport.park(this); // 沒有被轉移就阻塞執行緒
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 根據執行緒中斷狀態設定 interruptMode 
      break;
  }
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 呼叫同步佇列阻塞和嘗試獲取鎖的方法
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters(); // 把節點等待狀態(waitStatus)不為 CONDITION 的節點移除
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode); // 根據不同的中斷模式決定是丟擲中斷異常還是重新標記中斷
}

需要特別注意的是這個方法是支援中斷的,而且方法中很多判斷和方法都是與中斷有關的,具體哪些地方什麼情況會丟擲中斷異常這裡不詳細說,這個不是本文的重點。

首先講講 addConditionWaiter() 這個方法,原始碼如下:

private Node addConditionWaiter() {
  Node t = lastWaiter;
  // If lastWaiter is cancelled, clean out.
  if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters(); // 把所有節點等待狀態不為 CONDITION 的節點移除
    t = lastWaiter;
  }
  Node node = new Node(Thread.currentThread(), Node.CONDITION); // 新建一個條件佇列的節點
  if (t == null)
    firstWaiter = node;
  else
    t.nextWaiter = node;
  lastWaiter = node;
  return node;
}

這個方法做了兩件事:

1、如果佇列不為空而且最後一個節點等待狀態異常,就做一個全佇列掃描,去掉異常的節點;

2、把節點入隊,這裡要做一個判斷:如果佇列為空,把新節點作為頭節點,如果佇列非空,把新節點放在隊尾。

接著是 isOnSyncQueue() 方法,原始碼如下:

final boolean isOnSyncQueue(Node node) {
  if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
  if (node.next != null) // If has successor, it must be on queue
    return true;
  return findNodeFromTail(node);
}

這個方法主要是用來判斷當前執行緒的節點是否已經在同步隊列了,這個方法涉及三個判斷:

1、如果節點等待狀態是 CONDITION 或者節點的 prev 指標為空(節點在同步佇列這個指標才有值),那麼一定不是在同步佇列;

2、如果節點的 next 指標不為空,那麼一定在同步佇列;

3、遍歷同步佇列,看佇列中有沒有節點與這個節點相同。

signal()

signal() 方法是與 await() 方法對應的,一個負責通知,一個負責等待。

下面是 signal 方法的原始碼:

public final void signal() {
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  if (first != null)
    doSignal(first);
}

isHeldExclusively() 這個方法返回的是該執行緒是否正在獨佔資源,如果不是的話會丟擲異常。

整個 signal() 方法的重點裡面呼叫的 doSignal() 方法,傳入的引數是頭節點。

下面是 doSignal() 的原始碼:

private void doSignal(Node first) {
  do {
    if ( (firstWaiter = first.nextWaiter) == null) // 判斷頭節點的下一個節點是否為 null
      lastWaiter = null; // 如果佇列只有頭節點,將 lastWaiter 指標置為 null
    first.nextWaiter = null; // 遷移節點前先將節點的 nextWaiter 置為 null
  } while (!transferForSignal(first) && // 把頭節點遷移到同步佇列中去
           (first = firstWaiter) != null); // 沒有遷移成功就重新獲取頭節點判斷不為空繼續迴圈
}

這個方法也沒有太複雜的內容,具體可以看看上面的註釋,這裡詳細講講 transferForSignal()

方法原始碼如下:

final boolean transferForSignal(Node node) {
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 將節點 waitStatus 通過 CAS 更新為 0
    return false; // 更新失敗說明節點等待狀態已經變了,返回 false 重新獲取頭節點然後重試
  Node p = enq(node); // 將節點放入同步佇列中
  int ws = p.waitStatus;
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 判斷前一個節點等待狀態,如果狀態是正常的賦值為 SIGNAL
    LockSupport.unpark(node.thread); // 前面節點狀態為取消或就會喚醒當前節點,避免後面沒辦法被喚醒
  return true;
}

整個遷移佇列變化如下圖所示:

總結

以上就是關於 AQS 的全部內容,看到這裡大家應該就會有一個直觀的感受:AQS 其實核心就是佇列,佇列又是為各種鎖服務的。瞭解這些佇列節點的阻塞喚醒時機對我們去了解各種鎖非常有幫助。