☕【Java深層系列】「併發程式設計系列」讓我們一起探索一下CountDownLatch的技術原理和原始碼分析

語言: CN / TW / HK

CountDownLatch工作原理分析

一、大致介紹

那麼本篇文章和大家分享分析一下JDK1.8的CountDownLatch的工作原理;

簡單認識CountDownLatch

何為CountDownLatch?

  1. CountDownLatch從字面上理解,count計數做down的減法動作,而Latch又是門閂的意思;

  2. CountDownLatch是一種同步幫助,允許一個或多個執行緒等待,直到在其他執行緒中執行的一組操作完成;

  3. CountDownLatch內部沒有所謂的公平鎖\非公平鎖的靜態內部類,只有一個Sync靜態內部類,CountDownLatch內部基本上也是通過sync.xxx之類的這種呼叫方式的;

  4. CountDownLatch內部維護了一個虛擬的資源池,如果許可數不為為0一直執行緒阻塞等待,直到許可數為0時才釋放繼續往下執行;

CountDownLatch的state關鍵詞

  1. 其實CountDownLatch的實現也恰恰很好利用了其父類AQS的state變數值;

  2. 初始化一個數量值作為計數器的預設值,假設為N,那麼當任何執行緒呼叫一次countDown則計數值減1,直到許可為0時才釋放等待;

  3. CountDownLatch,簡單大致意思為:A組執行緒等待另外B組執行緒,B組執行緒執行完了,A組執行緒才可以執行;

常用重要的方法

建立一個給定許計數值的計數同步器物件

public CountDownLatch(int count)

入隊等待,直到計數器值為0則釋放等待

public void await()

釋放許可,計數器值減1,若計數器值為0則觸發釋放無用結點

public void countDown()

獲取目前最新的共享資源計數器值

public long getCount()

設計與實現虛擬碼

獲取共享鎖:

  • 如果檢測中斷狀態發現被中斷過的話,那麼則丟擲InterruptedException異常
  • 如果嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各種方式由AQS的子類實現 ),
  • 那麼就新增共享鎖結點通過自旋操作加入到佇列中,然後通過呼叫LockSupport.park進入阻塞等待,直到計數器值為零才釋放等待
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

釋放共享鎖:

  • 如果嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各種方式由AQS的子類實現 ),
  • 那麼通過自旋操作完成阻塞執行緒的喚起操作
public void countDown() {
        sync.releaseShared(1);
}

CountDownLatch生活細節化理解

比如百米賽跑,我就以賽跑為例生活化闡述該CountDownLatch原理:

  • 1、場景:百米賽跑十人蔘賽,終點處有一個裁判計數;

  • 2、開跑一聲起跑訊號,十個人爭先恐後的向終點跑去,真的是振奮多秒,令人振奮;

  • 3、當一個人到達終點,這個人就完成了他的賽跑事情了,就沒事一邊玩去了,那麼裁判則減去一個人;

  • 4、隨著人員陸陸續續的都跑到了終點,最後裁判計數顯示還有0個人未到達,意思就是人員都達到了;

  • 5、然後裁判就拿著登記的成績屁顛屁顛去輸入電腦登記了;

  • 6、到此打止,這一系列的動作認為是A組執行緒等待另外其他組執行緒的操作,直到計數器為零,那麼A則再幹其他事情;

原始碼分析CountDownLatch

CountDownLatch構造器

構造器原始碼:

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

2、建立一個給定許計數值的計數同步器物件,計數器值必須大於零,count值最後賦值給了state這個共享資源值;

Sync同步器

AQS --> Sync

CountDownLatch內的同步器都是通過Sync抽象介面來操作呼叫關係的,細看會發現基本上都是通過sync.xxx之類的這種呼叫方式的;

await()

原始碼

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * // 導致當前執行緒等待,直到計數器值減為零則釋放等待,或者由於執行緒被中斷也可導致釋放等待;
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
  • await此方法被呼叫後,則一直會處於等待狀態,其核心還是由於呼叫了LockSupport.park進入阻塞等待;
  • 當計數器值state=0時可以打破等待現狀,當然還有執行緒被中斷後也可以打破執行緒等待現狀;

acquireSharedInterruptibly(int)

原始碼:

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 呼叫之前先檢測該執行緒中斷標誌位,檢測該執行緒在之前是否被中斷過
            throw new InterruptedException(); // 若被中斷過的話,則丟擲中斷異常
        if (tryAcquireShared(arg) < 0) // 嘗試獲取共享資源鎖,小於0則獲取失敗,此方法由AQS的具體子類實現
            doAcquireSharedInterruptibly(arg); // 將嘗試獲取鎖資源的執行緒進行入隊操作
    }

由於是實現同步計數器功能,所以tryAcquireShared首次呼叫必定小於0,則就順利了進入了doAcquireSharedInterruptibly執行緒等待; 至於首次呼叫為什麼會小於0,請看子類的實現,子類的實現判斷為 "(getState() == 0) ? 1 : -1" ;

3.5、tryAcquireShared(int)

原始碼

	protected int tryAcquireShared(int acquires) {
		return (getState() == 0) ? 1 : -1; // 計數器值與零比較判斷,小於零則獲取鎖失敗,大於零則獲取鎖成功
	}

嘗試獲取共享鎖資源,但是在計數器CountDownLatch這個功能中,小於零則需要入隊,進入阻塞佇列進行等待;大於零則喚醒等待佇列,釋放await方法的阻塞等待;

doAcquireSharedInterruptibly(int)

原始碼

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
		// 按照給定的mode模式建立新的結點,模式有兩種:Node.EXCLUSIVE獨佔模式、Node.SHARED共享模式;
        final Node node = addWaiter(Node.SHARED); // 建立共享模式的結點
        boolean failed = true;
        try {
            for (;;) { // 自旋的死迴圈操作方式
                final Node p = node.predecessor(); // 獲取結點的前驅結點
                if (p == head) { // 若前驅結點為head的話,那麼說明當前結點自然不用說了,僅次於老大之後的便是老二了咯
                    int r = tryAcquireShared(arg); // 而且老二也希望嘗試去獲取一下鎖,萬一頭結點恰巧剛剛釋放呢?希望還是要有的,萬一實現了呢
                    if (r >= 0) { // 若r>=0,說明已經成功的獲取到了共享鎖資源
                        setHeadAndPropagate(node, r); // 把當前node結點設定為頭結點,並且呼叫doReleaseShared釋放一下無用的結點
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
					// 但是在await方法首次被呼叫會流轉到此,這個時候獲取鎖資源會失敗,即r<0,所以會進入是否需要休眠的判斷
					// 但是第一次進入休眠方法,因為被建立的結點waitStatus=0,所以會被修改一次為SIGNAL狀態,再次迴圈一次
					// 而第二次迴圈進入shouldParkAfterFailedAcquire方法時,返回true就是需要休眠,則順利呼叫park方式阻塞等待
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 根據前驅結點看看是否需要休息一會兒
                    parkAndCheckInterrupt()) // 阻塞操作,正常情況下,獲取不到共享鎖,程式碼就在該方法停止了,直到被喚醒
					// 被喚醒後,發現parkAndCheckInterrupt()裡面檢測了被中斷了的話,則補上中斷異常,因此拋了個異常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireSharedInterruptibly在實現計數器原理的時候,主要的乾的事情就是等待再等待,等到計數器值為零時才甦醒;

countDown()

原始碼

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1); // 釋放一個許可資源 
    }

釋放許可資源,也就是計數器值不斷的做減1操作,當計數器值為零的時候,該方法將會釋放所有正在等待的執行緒佇列;至於為什麼還會釋放所有,請看後續的releaseShared(int arg)講解;

releaseShared(int)

原始碼:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 嘗試釋放共享鎖資源,此方法由AQS的具體子類實現
            doReleaseShared(); // 自旋操作,喚醒後繼結點
            return true; // 返回true表明所有執行緒已釋放
        }
        return false; // 返回false表明目前還沒釋放完全,只要計數器值不為零的話,那麼都會返回false
    }
  1. releaseShared方法首先就判斷了tryReleaseShared(arg)的返回值,但是計數器值只要不為零,都會返回false,因此releaseShared該方法就立馬返回false了;

  2. 所以當計數器值慢慢減至零時,則立馬返回true,那麼也就立馬會呼叫doReleaseShared釋放所有等待的執行緒佇列;

tryReleaseShared(int)

原始碼:

	// CountDownLatch 的靜態內部類 Sync 類的 tryReleaseShared 方法	
	protected boolean tryReleaseShared(int releases) {
		// Decrement count; signal when transition to zero
		for (;;) { // 自旋的死迴圈操作方式
			int c = getState(); // 獲取最新的計數器值
			if (c == 0) // 若計數器值為零,說明已經通過CAS操作減至零了,所以在併發中讀取到零時並不需要做什麼操作,因此返回false
				return false;
			int nextc = c-1; // 計數器值減1操作
			if (compareAndSetState(c, nextc)) // 通過CAS比較,順利情況下設定成功返回true
				return nextc == 0; // 當通過計算操作得到的nextc為零時通過CAS修改成功,那麼表明所有事情都已經做完,需要釋放所有等待的執行緒佇列
			// 若CAS失敗,想都不用想肯定是由於併發操作,導致CAS失敗,那麼唯一可做的就是下一次迴圈檢視是否已經被其他執行緒處理了
		}
	}

CountDownLatch的靜態內部類實現父類AQS的方法,用來處理如何釋放鎖,籠統的講,若返回負數則需要進入阻塞佇列,否則需要釋放所有等待佇列;

doReleaseShared()

主要目的是釋放執行緒中所有等待的佇列,當計數器值為零時,此方法馬上會被呼叫,通過自旋方式輪詢幹掉所有等待的佇列;

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) { // 自旋的死迴圈操作方式
            Node h = head; // 每次都是取出佇列的頭結點
            if (h != null && h != tail) { // 若頭結點不為空且也不是隊尾結點
                int ws = h.waitStatus; // 那麼則獲取頭結點的waitStatus狀態值
                if (ws == Node.SIGNAL) { // 若頭結點是SIGNAL狀態則意味著頭結點的後繼結點需要被喚醒了
					// 通過CAS嘗試設定頭結點的狀態為空狀態,失敗的話,則繼續迴圈,因為併發有可能其它地方也在進行釋放操作
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // 喚醒頭結點的後繼結點
                }
				// 如頭結點為空狀態,則把其改為PROPAGATE狀態,失敗的則可能是因為併發而被改動過,則再次迴圈處理
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
			// 若頭結點沒有發生什麼變化,則說明上述設定已經完成,大功告成,功成身退
			// 若發生了變化,可能是操作過程中頭結點有了新增或者啥的,那麼則必須進行重試,以保證喚醒動作可以延續傳遞
            if (h == head)                   // loop if head changed
                break;
        }
    }

總結

1、有了分析AQS的基礎後,再來分析CountDownLatch便快了很多;

2、在這裡我簡要總結一下CountDownLatch的流程的一些特性:
	• 管理一個大於零的計數器值;
	• 每countDown一次則state就減1一次,直到許可證數量等於0則釋放佇列中所有的等待執行緒;
	• 也可以通過countDown/await組合一起使用,來實現CyclicBarrier的功能;

CountDownLatch用法

CountDownLatch類只提供了一個構造器:

public CountDownLatch(int count) {  };  //引數count為計數值

然後下面這3個方法是CountDownLatch類中最重要的方法:

public void await() throws InterruptedException { };   //呼叫await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()類似,只不過等待一定的時間後count值還沒變為0的話就會繼續執行
public void countDown() { };  //將count值減1

CountDownLatch, 一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。

舉個例子說明:

package main.java.CountDownLatch; 
import java.util.concurrent.CountDownLatch;

public class countDownlatchTest {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for(int i=0;i<5;i++){
            new Thread(new readNum(i,countDownLatch)).start();
        }
        countDownLatch.await();
        System.out.println("執行緒執行結束。。。。");
    }
 
    static class readNum  implements Runnable{
        private int id;
        private CountDownLatch latch;
        public readNum(int id,CountDownLatch latch){
            this.id = id;
            this.latch = latch;
        }
        @Override
        public void run() {
            synchronized (this){
                System.out.println("id:"+id);
                latch.countDown();
                System.out.println("執行緒組任務"+id+"結束,其他任務繼續");
            }
        }
    }
}

輸出結果:

id:1
執行緒組任務1結束,其他任務繼續
id:0
執行緒組任務0結束,其他任務繼續
id:2
執行緒組任務2結束,其他任務繼續
id:3
執行緒組任務3結束,其他任務繼續
id:4
執行緒組任務4結束,其他任務繼續
執行緒執行結束。。。。

執行緒在countDown()之後,會繼續執行自己的任務
「其他文章」