JAVA併發基石之AQS --- 心法篇

語言: CN / TW / HK

本文已參與「新人創作禮」活動,一起開啟掘金創作之路。


一、起源:

1、AQS是什麼?

全稱 AbstractQueuedSynchronizer 抽象佇列同步器,是java.util.concurrent.locks 包下的基礎元件, 是用於構建鎖和同步器的框架。

簡單的理解就是多執行緒環境下,AQS 封裝了一攬子對臨界資源執行緒安全的操作,以 ReentrantLock 來看: 獲取鎖?成功則鎖住,否則放入 FIFO 佇列等待獲取鎖(執行緒掛起或者說阻塞),鎖的釋放等這些底層操作都由 AQS 封裝;

許多同步器都可以通過 AQS 很容易並且高效的構造出來。

2、AQS在Java併發程式設計中扮演什麼角色?

java.util.concurrent 包下許多可阻塞類的底層實現,如 ReentrantLock, CountDownLatch, Semaphore, ReentrantReadWriteLock, SynchronousQueue 和 FutureTask 都是基於 AQS 構建。

可見搞懂 AQS 在閱讀 java 併發包下的其他元件時是多麼必要!!!

3、AQS核心是什麼?

包含三部分核心內容,狀態佇列、以及由具體工具類去實現的獲取/釋放等方法

java /** * The synchronization state. */ private volatile int state;

1)state狀態:

如上定義, 它的含義也會由具體的實現類不同而具有不同的含義,來看看幾個常用的同步器中的 state 含義: - ReentrantLock:state 狀態用於表示重入次數,重入1次 state 加1,釋放一次 state 減 1,state 為零表示沒有任何執行緒佔用鎖 - Semaphore 同步器:state 狀態用於儲存當前可用許可的數量,當嘗試拿一個許可併成功拿到時,相應的許可數量減去; 如果嘗試釋放併成功釋放許可時,許可數量將會增加 - CountDownLatch 同步器:和 Semaphore 和相似,state 狀態儲存的是當前的計數值,countDown 方法呼叫 release,從而導致計數值遞減, 並且當計數值為零時,解除所有執行緒的阻塞;await 呼叫 acquire,當計數器為零時,acquire 將立即返回,否則將阻塞 - FutureTask:state 狀態儲存任務的狀態,如正在執行、已完成或已取消 - ReentrantReadWriteLock:此實現有兩把鎖分別是讀鎖和寫鎖,state 狀態使用 16 位的狀態來表示寫入鎖的計數,使用另外 16 位的狀態來表示讀取鎖的計數

2)FIFO佇列 先進先出佇列: - 主要的作用是用來儲存等待的執行緒。比如很多執行緒都想要去搶奪這把鎖,但是大部分都是搶不到的,該如何處理這些執行緒呢?那就將它們放進佇列裡面,然後進行處理 - 當多個執行緒去競爭同一把鎖的時候,就需要排隊機制把那些沒能拿到鎖的執行緒串聯起來,當前面的執行緒釋放鎖之後,這個管理器就會挑選一個合適的執行緒來嘗試搶剛才被釋放的那把鎖。所以AQS就一直維護這個佇列,並把等待的執行緒都放進佇列裡面。 - 佇列內部是雙向連結串列,結構看起來簡單,但是要執行緒安全的維護這個佇列,也著實不容易....

3)獲取/釋放方法: - AQS 使用模版方法模式,將 tryAcquire,tryRelease 等具體獲取和釋放資源的方法提供給子類實現,這裡的資源主要針對的是state值的處理,來看一個例子就明白了:

```java /* *

Here is a non-reentrant mutual exclusion lock class that uses * the value zero to represent the unlocked state, and one to * represent the locked state. While a non-reentrant lock * does not strictly require recording of the current owner * thread, this class does so anyway to make usage easier to monitor. * It also supports conditions and exposes / class Mutex implements Lock, java.io.Serializable {

     // Our internal helper class
     // 這裡去繼承AQS並實現相關方法
     private static class Sync extends AbstractQueuedSynchronizer {
         // Reports whether in locked state
         @Override
         protected boolean isHeldExclusively() {
             return getState() == 1;
         }

         // 使用者實現
         // Acquires the lock if state is zero
         @Override
         public boolean tryAcquire(int acquires) {
             assert acquires == 1; // Otherwise unused
             if (compareAndSetState(0, 1)) {
                 setExclusiveOwnerThread(Thread.currentThread());
                 return true;
             }
             return false;
         }

         // 使用者實現
         // Releases the lock by setting state to zero
         @Override
         protected boolean tryRelease(int releases) {
             assert releases == 1; // Otherwise unused
             if (getState() == 0) {
                 throw new IllegalMonitorStateException();
             }
             setExclusiveOwnerThread(null);
             setState(0);
             return true;
         }

         // Provides a Condition
         Condition newCondition() { return new ConditionObject(); }

         // Deserializes properly
         private void readObject(ObjectInputStream s)
                 throws IOException, ClassNotFoundException {
             s.defaultReadObject();
             setState(0); // reset to unlocked state
         }
     }

     // The sync object does all the hard work. We just forward to it.
     private final Sync sync = new Sync();

     @Override
     public void lock()                { sync.acquire(1); }
     @Override
     public boolean tryLock()          { return sync.tryAcquire(1); }
     @Override
     public void unlock()              { sync.release(1); }
     @Override
     public Condition newCondition()   { return sync.newCondition(); }
     public boolean isLocked()         { return sync.isHeldExclusively(); }
     public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
     @Override
     public void lockInterruptibly() throws InterruptedException {
         sync.acquireInterruptibly(1);
     }
     @Override
     public boolean tryLock(long timeout, TimeUnit unit)
             throws InterruptedException {
         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
     }

 }
 //

```

上面通過繼承 AbstractQueuedSynchronizer 實現的非重入獨佔鎖,state=0 表示可以獲取鎖,state=1 表示次鎖已經被持有了, 可以看見通過重寫 tryAcquire,tryRelease 方法對具體資源的處理。

4、AQS核心原理

  • state:狀態量 也就是資源,實際獲取、釋放都是通過圍繞 state 狀態量判斷
     private volatile int state;
    
    • 等待佇列:實際是通過雙向連結串列實現的佇列,分別有隊頭引用 head, 隊尾引用 tail
    • head: 特別需要注意的是, head 對應的執行緒已經是獲得了資源,因此我們常說的喚醒是針對的除 head 節點之外的其他節點
    • 等待佇列:強調上一條,真正等待喚醒的節點是除 head 之外的其他節點
    • 等待佇列中被喚醒的節點只有獲取資源成功了才能設定成 head
    • 同時需要注意的是,只有前驅節點是 head 的節點才有機會嘗試獲取資源

java final Node p = node.predecessor(); if (p == head && tryAcquire(arg))

  • AQS大致的思路:定義 state 資源,作為能否獲取成功的依據,如果獲取失敗(那也得想辦法處理不是?),就放進 AQS 維護的一個等待佇列, 等待資源釋放了,就嘗試喚醒後繼節點(一般是這樣,但如果是 cancel 的需要排除佇列),被喚醒的節點嘗試去獲取資源,這個時候也可能會失敗的哈, 比如 ReentrantLock 的非公平鎖實現中,每次 tryAcquire 都要嘗試去獲取資源(這裡就和等待佇列裡被喚醒的節點是競爭關係),通過 CAS+自旋的方式保證執行緒安全性

  • Node節點定義:

```java static final class Node { // 表示當前節點在共享模式上等待 static final Node SHARED = new Node(); // 表示當前節點在獨佔模式上等待 static final Node EXCLUSIVE = null;

         // waitStatus有5個值,初始值=0
         // CANCELLED =  1
         // SIGNAL    = -1
         // CONDITION = -2
         // PROPAGATE = -3
         volatile int waitStatus;

         volatile Node prev; // 前驅節點

         volatile Node next; // 後繼節點

         volatile Thread thread; // 此節點對應的執行緒

         Node nextWaiter;

         ....

```

其中: - SIGNAL = -1,表明後繼節點需要被當前節點喚醒 - CANCELLED = 1,由於超時或中斷,該節點被取消,節點永遠不會離開此狀態。特別的是,具有取消節點的執行緒永遠不會再次阻塞 - CONDITION = -2,表示結點執行緒等待在 condition 上,當其他執行緒呼叫了 Condition 的 signal() 方法後,CONDITION 狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖 - PROPAGATE = -3,共享模式下,前驅節點不僅會喚醒其後繼節點,同時也可能喚醒後繼節點的後繼節點
- head,tail 節點:分別是隊頭,隊尾節點

```java /* * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. / // 等待佇列頭節點,只能通過方法setHead設定head,如果head節點存在, // 它的waitStatus狀態一定不會是CANCELLED private transient volatile Node head;

     /**
      * Tail of the wait queue, lazily initialized.  Modified only via
      * method enq to add new wait node.
      */
     // 等待佇列尾節點,只會在enq方法中新增到佇列
     private transient volatile Node tail;

```

二、原始碼分析

獨佔模式

1、acquire方法

忽略中斷的獨佔模式的入口方法 - 所謂忽略中斷,就是當執行緒發生中斷的情況僅僅只是記錄中斷狀態,而不會類似於丟擲中斷異常來響應,具體響應中斷可由實現類處理

java // arg引數表示的需要獲取的資源量 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

主要操作: - 首先通過 tryAcquire 嘗試獲取資源,如果獲取成功這裡就結束了;tryAcquire方式使用的模版方法模式來設計,由實現類去實現具體如何獲取資源這個操作 - 如果獲取失敗了,通過a ddWaiter 方法把當前執行緒封裝成一個 Node 節點放進等待佇列隊尾 - acquireQueued 核心方法,通常等待佇列裡的節點在嘗試獲取鎖失敗之後,都會阻塞在此放方法中(park 操作,也就是執行緒掛起), 當節點被喚醒時,又從被掛起的地方開始嘗試獲取資源;因為在被阻塞的過程中,此執行緒可能被中斷了,所以需要記錄中斷標誌(不立即響應),交給上層處理,acquireQueued 返回值就代表釋放已經中斷 - selfInterrupt 就是檢測到中斷後,設定中斷標誌

```java // 此操作會清除中斷標誌 Thread.interrupted()

    // 所以這裡需要重新設定中斷標誌
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

```

1.1、addWaiter

簡單來說就是將當前執行緒封裝成 Node 節點,新增至佇列尾部

```java /* * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node / private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 如果tail為空說明佇列還沒有初始化,需要通過enq進行處理 if (pred != null) { node.prev = pred; // 如果tail節點設定成功了就直接返回,否則通過enq處理(畢竟多執行緒環境CAS操作也有可能失敗嘛) if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

```

1.1.1 enq方法

自迴圈的方式將 node 加入佇列尾部,直到成功

java // 返回當前節點node的前驅節點 private Node enq(final Node node) { for (;;) { Node t = tail; // 如果還沒有tail節點,說明佇列還沒有初始化 if (t == null) { // Must initialize // 注意這裡先加入的是一個空Node節點,同時也沒有直接退出enq方法 // 也就是上面說的,除head之外 等待佇列裡的其他節點才是真正在阻塞狀態的節點 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // CAS設定成了就直接返回 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

這一通過自旋的方式保證能夠將 node 節點新增至等待佇列尾部

1.2、acquireQueued

獨佔不響應中斷模式嘗試獲取資源,如果此節點 node 的前驅節點是 head,那它才有資格去嘗試獲取資源,獲取失敗之後掛起執行緒,進入阻塞狀態

```java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 記錄中斷狀態,用做方法返回 boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 前驅節點 // 如果是前驅是head節點才去嘗試獲取資源 if (p == head && tryAcquire(arg)) { // 只有獲取資源成功之後才重新設定head為當前節點node setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判斷獲取資源失敗之後應該掛起執行緒 if (shouldParkAfterFailedAcquire(p, node) && // 掛起執行緒,並在被喚醒之後檢查執行緒中斷狀態 parkAndCheckInterrupt()) interrupted = true; } } finally { // 如果此節點node在嘗試獲取資源的過程中出現了異常,那就直接從等待佇列裡面取消此節點 if (failed) cancelAcquire(node); } }
//

```

主要操作: - 如果當前 node 節點的前驅是 head 節點,那麼才有資格去嘗試獲取資源 - 如果獲取資源成功之後,才會設定 head 為當前節點 node - 在獲取資源失敗之後一般就會考慮是否需要掛起執行緒 - 掛起的執行緒在被喚醒之後就會繼續從掛起前的地方開始操作,這個時候需要檢查一下中斷標誌,如果中斷了需要記錄一下,方便返回

1.2.1、shouldParkAfterFailedAcquire

判斷是否應該掛起當獲取資源失敗之後

```java private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 這裡是拿到的是前驅節點的狀態,當前節點node需要確保前驅節點是SIGNAL狀態,因為只有這樣 // 前驅節點在釋放資源之後才會喚醒當前節點node,也就是當前節點才能放心的去掛起 if (ws == Node.SIGNAL) / * This node has already set status asking a release * to signal it, so it can safely park. / return true; if (ws > 0) { // 如果前驅節點被取消了,那不斷嘗試往前找未取消的前驅節點 // 這是一定能找到的,因為至少head節點是不可能被取消的 / * Predecessor was cancelled. Skip over predecessors and * indicate retry. / do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { / * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. / // 這裡並不會直接掛起當前節點,而是設定前驅節點的waitStatus為SIGNAL,表明在它釋放資源之後需要喚醒當前節點 // 為啥不直接掛起當前節點node呢?採用的做法是線上程掛起之前再嘗試一次獲取資源,如果成功就不需要掛起了 // 考慮這樣一個場景:佇列裡有一個head和一個剛加入佇列的節點a,節點a嘗試過一次獲取資源,但是失敗了,因為head節點還沒有釋放資源 // 在節點a判斷是否需要掛起之前,這個時候正好head節點釋放資源,這個時候節點a只要再嘗試一次獲取資源也就可以成功了 // 畢竟掛起和喚醒執行緒的消耗是高於自旋等待的消耗 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //

```

主要操作 - 當前節點node需要確保前驅節點是SIGNAL狀態後 它才放心把自己掛起(因為會有人喚醒自己嘛) - 需要注意的是:waitStatus大於0表示節點是取消狀態,只有小於等於0才是有效狀態 - 當前驅節點的ws大於0了說明前驅節點被取消了,需要為當前節點找到一個未被取消的前驅節點,這個前驅節點一定是能找到的,至少head節點是不可能被取消的 - 否則的話需要將前驅節點的狀態設定為SIGNAL,表明前驅節點釋放資源後需要通知到自己(也就是當前node節點);當前節點node再經過一輪嘗試獲取資源失敗後就可以直接掛起了

1.2.2、parkAndCheckInterrupt

掛起當前執行緒,並檢查中斷標誌

```java private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

```

藉助LockSupport#park將當前執行緒掛起,同時通過Thread.interrupted()檢查當前執行緒的中斷標誌,需要注意的是, 此方法會清除中斷標誌,因此如果需要儲存中斷狀態的話,在上層需要再重新設定

1.2.3、cancelAcquire

取消需要獲取資源的node

```java private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    // 找到未被取消的前驅節點
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // 如果當前節點node為tail,那就重新將pred設定為tail
    if (node == tail && compareAndSetTail(node, pred)) {
        // 設定成功之後,因為pred為tail節點,肯定是沒有後繼節點了
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                // 相當於將node節點從佇列裡面排除了
                compareAndSetNext(pred, predNext, next);
        } else {
            // 試想當前node節點是等待佇列的第一個節點(head除外),如果它被取消了,是不是得擔負起喚醒後繼節點的責任?
            // 假如節點node是head節點釋放資源後喚醒的一個節點,而節點node在獲取資源的時候發生了異常,node節點需要被取消
            // 但是傳播喚醒這個責任是不是就落到了node節點的頭上? (畢竟是head喚醒了你,而你又要跑路,你是不是得先把喚醒責任處理好了再跑呢?)
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
//

```

主要操作: - 什麼時候會被取消?從原始碼註釋來看,當超時或者響應中斷的時候會進行取消 - 一般情況下將取消的節點從連結串列中移除即可 - 如果當前node節點的前驅節點是head,自己(node)本來是被喚醒去獲取資源的,現在被取消要刪除了,後面的 "兄弟"誰去喚醒呢? 因此這種情況下,我(node)需要喚醒一個後面的"兄弟",才能安靜的離去.....

2、release

獨佔模式下的資源釋放

```java public final boolean release(int arg) { // 通過tryRelease去嘗試釋放資源 if (tryRelease(arg)) { Node h = head; // head不為空說明等待佇列裡面有待喚醒的節點,waitStatus=0表示後繼節點不需要喚醒, // 因此需要滿足h.waitStatus!=0才能去喚醒後繼節點 if (h != null && h.waitStatus != 0) // 嘗試喚醒後繼節點 unparkSuccessor(h); return true; } return false; }

```

主要操作: - 通過tryRelease嘗試去釋放資源,這是利用模版方式模式,具體由子類去實現 - 滿足待喚醒條件後,通過unparkSuccessor去喚醒

2.1、unparkSuccessor

喚醒後繼節點 ```java private void unparkSuccessor(Node node) { / * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. / int ws = node.waitStatus; // 如果ws < 0表明是正常狀態,置為0表示不需要喚醒後繼節點這類操作 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // ws大於0表示已經取消了,那就找到第一個未取消的節點來喚醒,不過這裡是通過從後往前掃的方式處理的
    // 通常情況下是喚醒node.next節點,但是考慮到節點被取消或者是null的情況,需要從後面往前掃
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 喚醒執行緒
    if (s != null)
        LockSupport.unpark(s.thread);
}

```

共享模式

此模式下需要搞清楚tryAcquireShared方法返回的三類數值 - 如果返回小於0的值:表示此次獲取資源失敗 - 如果返回等於0的值:表示此次獲取資源成功,但是資源已經用盡了,不能向後傳播喚醒操作了(因為沒資源可用的,只能等到釋放之後) - 如果返回大於0的值:表示此次獲取資源成功,同時下一次也是有可能成功獲取資源,喚醒操作向後傳播(畢竟有資源可用嘛)

需要注意的是:獲取資源成功之後會進行的setHead操作,也就是head會被改變,但並不是說之前head對應的節點已經釋放資源結束了,換句話說新head節點獲取資源執行的同時,舊head節點仍然可能在運作中,其攜帶的資源並未釋放

舉一個實際例子來看看: - 假設資源總量為10,現在有4個執行緒a,b,c,d想要獲取資源執行,需要的資源分別是5,3,3,4 - 這個時候假設a,b已經搶佔資源並運行了,佔用資源數為8,剩餘資源2;需要注意的是執行緒a,b可不是等待佇列的節點哈 - 此時執行緒c來了,想要獲取資源,但是 3 > 2,獲取失敗,進入等到佇列 - 執行緒d也來了,同樣4 > 2獲取失敗,進入等待佇列 - 此時等待佇列的狀態:兩個節點等待喚醒,head節點中的Node是通過new Node()方式的"假節點",也就是沒有實際執行的執行緒 - 這個時候執行緒a執行結束,釋放資源數量為5,此時資源剩餘總量為7,嘗試喚醒執行緒c(優先佇列,先喚醒最開始的嘛), 執行緒c喚醒後成功獲取資源,並設定head為執行緒c所在節點,此時資源剩餘量為4;需要注意的是這裡head已經變了(這裡是關鍵), 說明前一個被喚醒的節點已經成功獲取了資源,那麼很有可能還有可用資源,OK,那根據共享模式的傳播特性,再去喚醒下一個共享模式的節點, 因此執行緒d對應的節點也被喚醒了,併成功獲取資源;這個時候在執行中的執行緒有b,c,d

從這個例子可以看出,它並不是盲目喚醒後繼節點,而是根據head變化來判斷上一個喚醒的節點是否已經成功獲取了資源,如果是則說明還有資源可用 否則的話,認為沒有資源可用,不在喚醒後繼節點了(這裡還有一個問題,如果可用資源是4,等待佇列中的節點a,b所需資源分別是5,4,但由於順序關係 節點a被喚醒,嘗試獲取資源失敗,此時儘管b節點所需資源滿足要求,但是不會喚醒b節點,傳播終止...)

1、acquireShared

共享模式下獲取資源

```java public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }

```

  • 通過tryAcquireShared方法嘗試獲取資源,也是模版方法,由子類具體實現
  • 如果獲取失敗,就嘗試放進等待佇列

1.1 doAcquireShared

獲取資源

```java /* * Acquires in shared uninterruptible mode. * @param arg the acquire argument / private void doAcquireShared(int arg) { // 將當前執行緒加入佇列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 前驅節點 final Node p = node.predecessor(); // 同樣,只有前驅節點是head的節點才有機會去獲取資源 if (p == head) { // 模版方法,具體資源獲取由子類實現 int r = tryAcquireShared(arg); // 如果獲取資源成功 if (r >= 0) { // 設定head為當前節點,同時可能的話嘗試喚醒後繼共享模式的節點 // 這裡r表示的資源剩餘量,只有剩餘量大於0才會去嘗試喚醒後繼節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 獲取失敗後是否需要掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 掛起執行緒,同時在恢復之後檢查中斷標誌 interrupted = true; } } finally { // 如果失敗就了取消這個節點 if (failed) cancelAcquire(node); } }

```

看起來和獨佔模式的獲取資源很像吧,有一點區別在於setHeadAndPropagate - 獨佔模式下,獲取資源成功之後通過setHead改變head就可以了 - 在共享模式下,通過setHeadAndPropagate方法更改head, 同時 如果資源剩餘量r>0說明有資源可用,嘗試喚醒後繼共享模式下的節點去獲取資源 相比獨佔模式多了一個喚醒傳播機制

1.1.1 setHeadAndPropagate

設定head並嘗試喚醒後繼節點

```java // 這裡引數propagate就是tryAcquireShared方法的當前資源剩餘量 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node);

    // propagate > 0 說明有資源可用
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 嘗試喚醒後繼節點
            doReleaseShared();
    }
}

```

1.1.2 doReleaseShared

共享模式下,喚醒後繼節點以確保傳播性;而獨佔模式的僅在資源釋放時,才會去獲取head節點的後繼節點

```java private void doReleaseShared() { // 這裡使用自迴圈是保證在CAS失敗之後也要進行重試 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果後繼節點需要喚醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // 後繼節點不需要喚醒的話,將h的ws設定為PROPAGATE,以確保release之後的傳播性 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS }

        // 這裡很關鍵,如果head改變了就意味著上一個被喚醒的節點已經成功獲得了資源
        // 也就是也許還有資源可以獲取,因此嘗試喚醒後繼節點,以此類推...
        if (h == head)                   // loop if head changed
            break;
    }
}        
//

```

需要關注兩點: - 使用自迴圈是保證在CAS失敗之後也要進行重試 - 通過head來判斷被喚醒的節點是否已經獲取資源(畢竟獲取資源成功之後就會改變head的值); 如果head改變了就意味著上一個被喚醒的節點已經成功獲得了資源, 也就是也許還有資源可以獲取,因此嘗試喚醒後繼節點,以此類推...

2、releaseShared 釋放資源

```java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 這裡也是通過doReleaseShared來釋放資源 doReleaseShared(); return true; } return false; }

```

三、小結

從原始碼層面來看分析了獨佔模式共享模式的獲取、釋放資源相關原始碼,其中還有其配套的響應中斷的獨佔模式 和 響應中斷的共享模式, 結構上基本一致,只不過在對遇到中斷的情況下做了中斷響應,這裡不在贅述,有興趣可以看看原始碼。

整個AQS的結構都是圍繞著三個核心點展開 - state狀態: 不同的實現類有不同的含義,通俗的理解就是資源 - FIFO佇列: 當嘗試獲取資源失敗之後,得想一個辦法把失敗的執行緒給存起來,這裡也就是給放進佇列裡了;麻煩的是需要考慮在多執行緒環境下維護佇列的執行緒安全性,同時需要剔除取消的節點,考慮如何喚醒後繼節點等... - 獲取/釋放資源方法: 因為不同的實現類對資源的定義不同,因此具體的獲取和釋放資源的方法 通過模版方法的方式交給子類來實現

AQS在具體的工作: - FIFO 是其核心,維護佇列,執行緒掛起、喚醒,取消節點移除、共享模式下的傳播性維護等等,都是為了佇列節點合理的獲取資源,維持其正確性和高效性,保證在多執行緒環境下資源合理的競爭,競爭者被合理的分配去獲取資源。