Java 併發程式設計解析 | 基於JDK原始碼解析Java領域中的併發鎖,我們可以從中學習到什麼內容?
蒼穹之邊,浩瀚之摯,眰恦之美; 悟心悟性,善始善終,惟善惟道! —— 朝槿《朝槿兮年說》
寫在開頭
在Java領域中, 尤其是在併發程式設計領域,對於多執行緒併發執行一直有兩大核心問題:同步和互斥。其中:
- 互斥(Mutual Exclusion):一個公共資源同一時刻只能被一個程序或執行緒使用,多個程序或執行緒不能同時使用公共資源。即就是同一時刻只允許一個執行緒訪問共享資源的問題。
- 同步(Synchronization):兩個或兩個以上的程序或執行緒在執行過程中協同步調,按預定的先後次序執行。即就是執行緒之間如何通訊、協作的問題。
針對對於這兩大核心問題,利用管程是能夠解決和實現的,因此可以說,管程是併發程式設計的萬能鑰匙。
雖然,Java在基於語法層面(synchronized 關鍵字)實現了對管程技術,但是從使用方式和效能上來說,內建鎖(synchronized 關鍵字)的粒度相對過大,不支援超時和中斷等問題。
為了彌補這些問題,從JDK層面對其“重複造輪子”,在JDK內部對其重新設計和定義,甚至實現了新的特性。
關健術語
本文用到的一些關鍵詞語以及常用術語,主要如下:
- 訊號量(Semaphore): 是在多執行緒環境下使用的一種設施,是可以用來保證兩個或多個關鍵程式碼段不被併發呼叫,也是作系統用來解決併發中的互斥和同步問題的一種方法。
- 訊號量機制(Semaphores): 用來解決同步/互斥的問題的,它是1965年,荷蘭學者 Dijkstra提出了一種卓有成效的實現程序互斥與同步的方法。
- 管程(Monitor) : 一般是指管理共享變數以及對共享變數的操作過程,讓它們支援併發的一種機制。
基本概述
在Java領域中,我們可以將鎖大致分為基於Java語法層面(關鍵詞)實現的鎖和基於JDK層面實現的鎖。
在Java領域中,從JDK原始碼分析來看,基於JDK層面實現的鎖大致主要可以分為以下4種方式:
- 基於Lock介面實現的鎖
- 基於ReadWriteLock介面實現的鎖
- 基於AQS基礎同步器實現的鎖
- 基於自定義API操作實現的鎖
從閱讀原始碼不難發現,在Java SDK 併發包主要通過AbstractQueuedSynchronizer(AQS)實現多執行緒同步機制的封裝與定義,而通過Lock 和 Condition 兩個介面來實現管程,其中 Lock 用於解決互斥問題,Condition 用於解決同步問題。
一. 基本理論
在併發程式設計領域,有兩大核心問題:一個是互斥,即同一時刻只允許一個執行緒訪問共享資源;另一個是同步,即執行緒之間如何通訊、協作。
在作業系統中,一般有如果I/O操作時,對於阻塞和非阻塞是從函式呼叫角度來說的,其中:
- 阻塞:如果讀寫操作沒有就緒或者完成,則函式一直等待。
- 非阻塞: 函式立即呼叫,然後讓應用程式輪詢迴圈。
而同步和非同步則是從“讀寫是主要是由誰完成”的角度來說的,其中:
- 同步: 讀寫操作主要交給應用程式完成
- 非同步: 讀寫操作主要由作業系統完成,一般完成之後,回撥函式和事件通知應用程式。
其中,訊號量機制(Semaphores)是用來解決同步/互斥的問題的,但是訊號量(Semaphore)的操作分散在各個程序或執行緒中,不方便進行管理,因每次需呼叫P/V(來自荷蘭語 proberen和 verhogen)操作,還可能導致死鎖或破壞互斥請求的問題。
由於PV操作對於解決程序互斥/同步程式設計複雜,因而在此基礎上提出了與訊號量等價的——“管程技術”。
其中,管程(Monitor)當中定義了共享資料結構只能被管程內部定義的函式所修改,所以如果我們想修改管程內部的共享資料結構的話,只能呼叫管程內部提供的函式來間接的修改這些資料結構。
一般來說,管程(Monitor)和訊號量(Semaphore)是等價的,所謂等價指的是用管程能夠實現訊號量,也能用訊號量實現管程。
在管程的發展歷程上,先後出現過Hasen模型、Hoare模型和MESA模型等三種不同的管程模型,現在正在廣泛使用的是MESA模型。
在MESA模型中,管程中引入了條件變數(Conditional Variable)的概念,而且每個條件變數都對應有一個等待佇列(Wait Queue)。其中,條件變數和等待佇列的作用是解決執行緒之間的同步問題。
而對於解決執行緒之間的互斥問題,將共享變數(Shared Variable)及其對共享變數的操作統一封裝起來,一般主要是實現一個執行緒安全的阻塞佇列(Blocking Queue),將執行緒不安全的佇列封裝起來,對外提供執行緒安全的操作方法,例如入隊操作(Enqueue)和出隊操作(Dequeue)。
在Java領域中,對於Java語法層面實現的鎖(synchronized 關鍵字), 其實就是參考了 MESA 模型,並且對 MESA 模型進行了精簡,一般在MESA 模型中,條件變數可以有多個,Java 語言內建的管程(synchronized)裡只有一個條件變數。
這就意味著,被synchronized 關鍵字修飾的程式碼塊或者直接標記靜態方法以及例項方法,在編譯期會自動生成相關加鎖(lock)和解鎖(unlock)的程式碼,即就是monitorenter和monitorexit指令。
對於synchronized 關鍵字來說,主要是在Java HotSpot(TM) VM 虛擬機器通過Monitor(監視器)來實現monitorenter和monitorexit指令的。
同時,在Java HotSpot(TM) VM 虛擬機器中,每個物件都會有一個監視器,監視器和物件一起建立、銷燬。
監視器相當於一個用來監視這些執行緒進入的特殊房間,其義務是保證(同一時間)只有一個執行緒可以訪問被保護的臨界區程式碼塊。
本質上,監視器是一種同步工具,也可以說是JVM對管程的同步機制的封裝實現,主要特點是:
- 同步:監視器所保護的臨界區程式碼是互斥地執行的。一個監視器是一個執行許可,任一執行緒進入臨界區程式碼都需要獲得這個許可,離開時把許可歸還。
- 協作:監視器提供Signal機制,允許正持有許可的執行緒暫時放棄許可進入阻塞等待狀態,等待其他執行緒傳送Signal去喚醒;其他擁有許可的執行緒可以傳送Signal,喚醒正在阻塞等待的執行緒,讓它可以重新獲得許可並啟動執行。
在Hotspot虛擬機器中,監視器是由C++類ObjectMonitor實現的,ObjectMonitor類定義在ObjectMonitor.hpp檔案中,其中:
- Owner: 指向的執行緒即為獲得鎖的執行緒
- Cxq:競爭佇列(Contention Queue),所有請求鎖的執行緒首先被放在這個競爭佇列中
- EntryList:物件實體列表,表示Cxq中那些有資格成為候選資源的執行緒被移動到EntryList中。
- WaitSet:類似於等待佇列,某個擁有ObjectMonitor的執行緒在呼叫Object.wait()方法之後將被阻塞,然後該執行緒將被放置在WaitSet連結串列中。
同時,管程與Java中面向物件原則(Object Oriented Principle)也是非常契合的,主要體現在 java.lang.Object類中wait()、notify()、notifyAll() 這三個方法,其中:
- wait()方法: 阻塞執行緒並且進入等待佇列
- notify()方法:隨機地通知等待佇列中的一個執行緒
- notifyAll()方法: 通知等待佇列中的所有執行緒
不難發現,在Java中synchronized 關鍵字及 java.lang.Object類中wait()、notify()、notifyAll() 這三個方法都是管程的組成部分。
由此可見,我們可以得到一個比較通用的併發同步工具基礎模型,大致包含如下幾個內容,其中:
- 條件變數(Conditional Variable): 利用執行緒間共享的變數進行同步的一種工作機制
- 共享變數((Shared Variable)):一般指物件實體物件的成員變數和屬性
- 阻塞佇列(Blocking Queue):共享變數(Shared Variable)及其對共享變數的操作統一封裝
- 等待佇列(Wait Queue):每個條件變數都對應有一個等待佇列(Wait Queue),內部需要實現入隊操作(Enqueue)和出隊操作(Dequeue)方法
- 變數狀態描述機(Synchronization Status):描述條件變數和共享變數之間狀態變化,又可以稱其為同步狀態
- 工作模式(Operation Mode): 執行緒資源具有排他性,因此定義獨佔模式和共享模式兩種工作模式
綜上所述,條件變數和等待佇列的作用是解決執行緒之間的同步問題;共享變數與阻塞佇列的作用是解決執行緒之間的互斥問題。
二.AQS基礎同步器的設計與實現
在Java領域中,同步器是專門為多執行緒併發設計的同步機制,主要是多執行緒併發執行時執行緒之間通過某種共享狀態來實現同步,只有當狀態滿足這種條件時執行緒才往下執行的一種同步機制。
對於多執行緒實現實現併發處理機制來說,一直以來,多執行緒都存在2個問題:
- 執行緒之間記憶體共享,需要通過加鎖進行控制,但是加鎖會導致效能下降,同時複雜的加鎖機制也會增加程式設計編碼難度
- 過多執行緒造成執行緒之間的上下文切換,導致效率低下
因此,在併發程式設計領域中,一直有一個很重要的設計原則: “ 不要通過記憶體共享來實現通訊,而應該通過通訊來實現記憶體共享。”
簡單來說,就是儘可能通過訊息通訊,而不是記憶體共享來實現程序或者執行緒之間的同步。
其中,同步器是專門為多執行緒併發設計的同步機制,主要是多執行緒併發執行時執行緒之間通過某種共享狀態來實現同步,只有當狀態滿足這種條件時執行緒才往下執行的一種同步機制。
由於在不同的應用場景中,對於同步器的需求也會有所不同,一般在我們自己去實現和設計一種併發工具的時候,都需會考慮以下幾個問題:
- 是否支援響應中斷? 如果阻塞狀態的執行緒能夠響應中斷訊號,也就是說當我們給阻塞的執行緒傳送中斷訊號的時候,能夠喚醒它,那它就有機會釋放曾經持有的鎖。
- 是否支援超時?如果執行緒在一段時間之內沒有獲取到鎖,不是進入阻塞狀態,而是返回一個錯誤,那這個執行緒也有機會釋放曾經持有的鎖。
- 是否支援非阻塞地獲取鎖資源 ? 如果嘗試獲取鎖失敗,並不進入阻塞狀態,而是直接返回,那這個執行緒也有機會釋放曾經持有的鎖。
從閱讀JDK原始碼不難發現,主要是採用設計模式中模板模式的原則,JDK將各種同步器中相同的部分抽象封裝成了一個統一的基礎同步器,然後基於基礎同步器為模板通過繼承的方式來實現不同的同步器。
也就是說,在實際開發過程中,除了直接使用JDK實現的同步器,還可以基於這個基礎同步器我們也可以自己自定義實現符合我們業務需求的同步器。
在JDK原始碼中,同步器位於java.util.concurrent.locks包下,其基本定義是AbstractQueuedSynchronizer類,即就是我們常說的AQS同步器。
1. 設計思想
一個標準的AQS同步器主要有同步狀態機制,等待佇列,條件佇列,獨佔模式,共享模式等五大核心要素組成。
JDK的JUC(java.util.concurrent.)包中提供了各種併發工具,但是大部分同步工具的實現基於AbstractQueuedSynchronizer類實現,其內部結構主要如下:
- 同步狀態機制(Synchronization Status):主要用於實現鎖(Lock)機制,是指同步狀態,其要求對於狀態的更新必須原子性的
- 等待佇列(Wait Queue):主要用於存放等待執行緒獲取到的鎖資源,並且把執行緒維護到一個Node(節點)裡面和維護一個非阻塞的CHL Node FIFO(先進先出)佇列,主要是採用自旋鎖+CAS操作來保證節點插入和移除的原子性操作。
- 條件佇列(Condition Queue):用於實現鎖的條件機制,一般主要是指替換“等待-通知”工作機制,主要是通過ConditionObject物件實現Condition介面提供的方法實現。
- 獨佔模式(Exclusive Mode):主要用於實現獨佔鎖,主要是基於靜態內部類Node的常量標誌EXCLUSIVE來標識該節點是獨佔模式
- 共享模式(Shared Mode):主要用於實現共享鎖,主要是基於靜態內部類Node的常量標誌SHARED來標識該節點是共享模式
其中,對於AbstractQueuedSynchronizer類的實現原理,我們可以從如下幾個方面來看:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691 L; protected AbstractQueuedSynchronizer() {} /** * 等待佇列: head-頭節點 */ private transient volatile Node head; /** * 等待佇列: tail-尾節點 */ private transient volatile Node tail; /** * 同步狀態:32位整數型別,更新同步狀態(state)時必須保證其是原子性的 */ private volatile int state; /** * 自旋鎖消耗超時時間閥值(threshold): threshold < 1000ns時,表示競爭時選擇自旋;threshold > 1000ns時,表示競爭時選擇系統阻塞 */ static final long spinForTimeoutThreshold = 1000 L; /** * CAS原子性操作 */ private static final Unsafe unsafe = Unsafe.getUnsafe(); /** * stateOffset */ private static final long stateOffset; /** * headOffset */ private static final long headOffset; /** * tailOffset */ private static final long tailOffset; /** * waitStatusOffset */ private static final long waitStatusOffset; /** * nextOffset */ private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } }
[1]. AbstractQueuedSynchronizer類的實現原理是繼承了基於AbstractOwnableSynchronizer類的抽象類,其中主要對AQS同步器的通用特性和方法進行抽象封裝定義,主要包括如下方法:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961 L; protected AbstractOwnableSynchronizer() {} /** * 同步器擁有者 */ private transient Thread exclusiveOwnerThread; /** * 設定同步器擁有者:把執行緒當作引數傳入,指定某個執行緒為獨享 */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } /** * 獲取同步器擁有者:獲取指定的某個執行緒 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
- setExclusiveOwnerThread(Thread thread)方法: 把某個執行緒作為引數傳入,從而設定AQS同步器的所有者,即就是我們設定的某個執行緒
- getExclusiveOwnerThread()方法: 獲取當前AQS同步器的所有者,即就是我們指定的某個執行緒
[2]. 對於同步狀態(state),其型別是32位整數型別,並且是被volatile修飾的,表示在更新同步狀態(state)時必須保證其是原子性的。
[3]. 對於等待佇列的結構,主要是在Node定義了head和tail變數,其中head表示頭部節點,tail表示尾部節點
[4].對於等待佇列的結構提到的Node類來說,主要內容如下:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on */ volatile Node prev; /** * Link to the successor node that the current node/thread */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
- 標記Node的工作模式常量標記:主要維護了SHARED和EXCLUSIVE等2個靜態字面常量,其中 SHARED 用於標記Node中是共享模式,EXCLUSIVE:用於標記Node中是獨享模式
- 標記等待狀態的靜態字面常量標記: 主要維護了0(表示無狀態),SIGNAL(-1,表示後續節點中的執行緒通過park進入等待,當前節點在釋放和取消時,需要通過unpark解除後後續節點的等待),CANCELLED(1,表示當前節點中的執行緒因為超時和中斷被取消),CONDITION(-2,表示當前節點在條件佇列中),PROPAGATE(-3,SHARED共享模式的頭節點描述狀態,表示無條件往下傳播)等5個靜態字面常量
- 維護了一個等待狀態(waitStatus): 主要用於描述等待佇列中節點的狀態,其取值範圍為0(waitStatus=0,表示無狀態),SIGNAL(waitStatus=-1,表示等待訊號狀態),CANCELLED(waitStatus=1,表示取消狀態),CONDITION(waitStatus=-2,表示條件狀態),PROPAGATE(waitStatus=-3,表示SHARED共享模式狀態)等5個靜態字面常量,CAS操作時寫入,預設值為0。
- 維護了Node的2個結構節點變數: 主要是prev和next,其中,prev表示前驅節點,next表示後續節點,表示構成雙向向連結串列,構成了等待佇列的資料結構
- 維護了一個狀態工作模式標記: 主要是維護了一個nextWaiter,用於表示在等待佇列中當前節點在是共享模式還是獨享模式,而對於條件佇列來說,用於組成單向連結串列結構
- 維護了一個執行緒物件變數: 主要用於記錄當前節點中的執行緒thread
[5].對於自旋鎖消耗超時時間閥值(spinForTimeoutThreshold),主要表示系統依據這個閥值來選擇自旋方式還是系統阻塞。一般假設這個threshold,當 threshold < 1000ns時,表示競爭時選擇自旋;否則,當threshold > 1000ns時,表示競爭時選擇系統阻塞
[6].對於帶有Offset 等變數對應各自的控制代碼,主要用於執行CAS操作。在JDK1.8版本之前,CAS操作主要通過Unsafe類來說實現;在JDK1.8版本之後,已經開始利用VarHandle來替代Unsafe類操作實現。
[7].對於CAS操作來說,主要提供瞭如下幾個方法:
- compareAndSetState(int expect, int update)方法:CAS操作原子更新狀態
- compareAndSetHead(Node update)方法:CAS操作原子更新頭部節點
- compareAndSetTail(Node expect, Node update)方法:CAS操作原子更新尾部節點
- compareAndSetWaitStatus(Node node, int expect,int update)方法:CAS操作原子更新等待狀態
- compareAndSetNext(Node node,Node expect,Node update)方法:CAS操作原子更新後續節點
[8].對於條件佇列(ConditionObject)來說,主要內容如下:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699 L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() {} }
- 基於Condition的介面實現條件佇列,其核心主要是實現阻塞和喚醒的工作機制
- 基於Node定義了firstWaiter和lastWaiter變數,其中,firstWaiter表示的是頭節點,lastWaiter是尾節點
- 還定義了2個字面常量REINTERRUPT和THROW_IE,其中REINTERRUPT=1,描述的是當中斷是退出條件佇列,THROW_IE=-1表示的是發生異常時退出
[8].除此之外,在AQS基礎同步器中,一般可以通過構造方法直接將引數值賦給對應變數,也可以通過變數控制代碼進行賦值操作:
- isShared()方法: 用於判斷等待佇列是否為共享模式
- predecessor()方法: 用於獲取當前節點對應的前驅節點,如果為空,則 throw new NullPointerException();
2. 基本實現
一個標準的AQS同步器最核心底層設計實現是一個非阻塞的CHL Node FIFO(先進先出)佇列資料結構,通過採用自旋鎖+CAS操作的方法來保證原子性操作。
總的來說,一個AQS基礎同步器,底層的資料結構採用的是一個非阻塞的CHL Node FIFO(先進先出)佇列資料結構,而實現的核心演算法則是採用自旋鎖+CAS操作的方法。
首先,對於非阻塞的CHL Node FIFO(先進先出)佇列資料結構,一般來說,FIFO(First In First Out,先進先出)佇列是一個有序列表,屬於抽象型資料型別(Abstract Data Type,ADT),所有的插入和刪除操作都發生在隊首(Front)和隊尾(Rear)兩端,具有先進先出的特性。
/** * 等待佇列: head-頭節點 */ private transient volatile Node head; /** * 等待佇列: tail-尾節點 */ private transient volatile Node tail; /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
在AQS同步器的原始碼中,主要是通過靜態內部類Node來實現的這個非阻塞的CHL Node FIFO(先進先出)佇列資料結構, 維護了兩個變數head和tail,其中head對應隊首(Front),tail對應隊尾(Rear)。同時,還定義了addWaiter(Node mode)方法來表示入隊操作,其中有個enq(final Node node)方法,主要用於初始化佇列中head和tail的設定。
其次,AQS同步器以CLH鎖為基礎,其中CLH鎖是一種自旋鎖,對於自旋鎖的實現方式來看,主要可以分為普通自旋鎖和自適應自旋鎖,CLH鎖和MCS鎖等4種,其中:
- 普通自旋鎖:多個執行緒不斷自旋,不斷嘗試獲取鎖,其不具備公平性和由於要保證CPU和快取以及主存之間的資料一致性,其開銷較大。
- 自適應自旋鎖:主要是為解決普通自旋鎖的公平性問題,引入了一個排隊機制,一般稱為排他自旋鎖,其具備公平性,但是沒有解決保證CPU和快取以及主存之間的資料一致性問題,其開銷較大。
- CLH鎖:通過一定手段將執行緒對於某一個共享變數的輪詢競爭轉化為一個執行緒佇列,且佇列中的執行緒各自輪詢自己本地變數。
- MCS鎖:主旨在於解決 CLH鎖的問題,也是基於FIFO佇列,與CLH鎖不同是,只對本地變數自旋,前驅節點負責通知MCS鎖中執行緒自適結束。
自旋鎖是一種實現同步的方案,屬於一種非阻塞鎖,與常規鎖主要的區別就在於獲取鎖失敗之後的處理方式不同,主要體現在:
- 一般情況下,常規鎖在獲取鎖失敗之後,會將執行緒阻塞並適當時重新喚醒
- 而自旋鎖則是使用自旋來替換阻塞操作,主要是執行緒會不斷迴圈檢查該鎖是否被釋放,一旦釋放執行緒便會獲取鎖資源。
從本質上講,自旋是一鍾忙等待狀態,會一直消耗CPU的執行時間。一般情況下,常規互斥鎖適用於持有鎖長時間的情況,自旋鎖適合持有時間短的情況。
其中,對於CLH鎖來說,其核心是為解決同步帶來的花銷問題,Craig,Landim,Hagersten三人發明了CLH鎖,其中主要是:
- 構建一個FIFO(先進先出)佇列,構建時主要通過移動尾部節點tail來實現佇列的排隊,每個想獲得鎖的執行緒都會建立一個新節點(next)並通過CAS操作原子操作將新節點賦予給tail,當前執行緒輪詢前一個節點的狀態。
- 執行完執行緒後,只需將當前執行緒對應節點狀態設定為解鎖即可,主要是判斷當前節點是否為尾部節點,如果是直接設定尾部節點設定為空。由於下一個節點一直在輪詢,所以可以獲得鎖。
CLH鎖將眾多執行緒長時間對資源的競爭,通過有序化這些執行緒將其轉化為只需要對本地變數檢測。唯一存在競爭的地方就是入隊之前對尾部節點tail 的競爭,相對來說,當前執行緒對資源的競爭次數減少,這節省了CPU快取同步的消耗,從而提升了系統性能。
但是同時也有一個問題,CLH鎖雖然解決了大量執行緒同時操作同一個變數時帶來的開銷問題,如果前驅節點和當前節點在本地主存中不存在,則訪問時間過長,也會引起效能問題。
為了讓CLH鎖更容易實現取消和超時的功能,AQS同步器在設計時進行了改造,主要體現在:節點的結構和節點等待機制。其中:
- 節點的結構: 主要引入了頭節點和尾節點,分別指向佇列頭部和尾部,對於鎖的相關操作都與其息息相關,並且每個節點都引入了前驅節點和後繼節點。
- 節點等待機制: 主要在原來的自旋基礎上增加了系統阻塞喚醒,主要體現在 自旋鎖消耗超時時間閥值(threshold): threshold < 1000ns時,表示競爭時選擇自旋;threshold > 1000ns時,表示競爭時選擇系統阻塞。
由此可見,主要是通過前驅節點和後繼節點的引用連線起來形成一個連結串列佇列,其中對於入隊,檢測節點,出隊,判斷超時,取消節點等操作主要如下:
- 入隊(enqueue): 主要採用一個無限迴圈進行CAS操作,即就是使用自旋方式競爭直到成功。
- 檢測節點(checkedPrev): 一般在入隊完成後,主要是檢測判斷當前節點的前驅節點是否為頭節點, 一般自旋方式是直接進入迴圈檢測,而系統阻塞方式是當前執行緒先檢測,其中如果是頭節點併成功獲取鎖,則直接返回,當前執行緒不阻塞,否則對當前執行緒進行阻塞。
- 出隊(dequeue):主要負責喚醒等待佇列中的後繼節點,並且按照條件往下傳播有序執行
- 判斷超時(checkedTimeout): 佇列中等待鎖的執行緒可能因為中斷或者超時的情況,當總耗時大於等於自定義耗時就直接返回,即就是
- 取消節點(cancel): 主要是對於中斷和超時而涉及到取消操作,而且這樣的情況不再參與鎖競爭,即就是一般通過呼叫compareAndSetNext(Node node, Node expect,Node update)來進行CAS操作。
特別值得注意的是,AQS基礎同步器中主要有等待佇列和條件佇列兩種對列結構,對比便不難發現:等待佇列採用的底層資料結構是雙向連結串列結構,而對於條件佇列則是單向連結串列結構。
最後,AQS同步器中使用了CAS操作,其中CAS(Compare And Swap,比較並交換)操作時一種樂觀鎖策略,主要涉及三個操作資料:記憶體值,預期值,新值,主要是指當且僅當預期值和記憶體值相等時才去修改記憶體值為新值。
一般來說,CAS操作的具體邏輯,主要可以分為三個步驟:
- 首先,檢查某個記憶體值是否與該執行緒之前取到值一樣。
- 其次,如果不一樣,表示此記憶體值已經被別的執行緒修改,需要捨棄本次操作。
- 最後,如果時一樣,表示期間沒有執行緒更改過,則需要用新值執行更新記憶體值。
除此之外,需要注意的是CAS操作具有原子性,主要是由CPU硬體指令來保證,並且通過Java本地介面(Java Native Interface,JNI)呼叫本地硬體指令實現。
當然,CAS操作避免了悲觀策略獨佔物件的 問題,同時提高了併發效能,但是也有以下三個問題:
- 樂觀策略只能保證一個共享變數的原子操作,如果是多個變數,CAS便不如互斥鎖,主要是CAS操作的侷限所致。
- 長時間迴圈操作可能導致開銷過大。
- 經典的ABA問題: 主要是檢查某個記憶體值是否與該執行緒之前取到值一樣,這個判斷邏輯不嚴謹。解決ABA問題的核心在於,引入版本號,每次更新變數值更新版本號。
而在AQS同步器中,為了保證併發實現保證原子性,而且是硬體級別的原子性,一般是通過JNI(Java native interface,Java 本地介面)方式讓Java程式碼呼叫C/C++原生代碼。
通過分析原始碼可知,一般使用Unsafe類需要只用關注如下方法即可:
public final class Unsafe { private static final Unsafe theUnsafe; private static native void registerNatives(); private Unsafe() {} @CallerSensitive public static Unsafe getUnsafe() { Class var0 = Reflection.getCallerClass(); if (!VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } } public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); public native void unpark(Object var1); public native void park(boolean var1, long var2); }
- rgisterNatives()方法:是一個靜態 方法,主要用於註冊本地方法
- 建構函式是private私有化的,一般無法通過建構函式來例項化Unsafe物件
- getUnsafe()方法:是用來獲取Unsafe物件的,雖然是公有化的,但是如果Java語言開發層面的對進行安全檢查
一般地,由於Unsafe類的操作涉及到硬體底層的操作,JDK對其例項化做了安全校驗,只有受系統信任的程式碼才對其例項化,主要是通過類載入器來解析,其例項化方式主要有如下方式:
- 第一種:直接呼叫該方法,主要新式是Unsafe.getUnsafe()。但是,對於我們實際開發來說,這種方式無法通過安全校驗行不通,系統會丟擲 throw new SecurityException("Unsafe")資訊
- 第二種:通過反射機制繞過安全檢查,主要是修改Unsafe類中theUnsafe欄位的訪問許可權,讓其能被訪問從而達到獲取Unsafe物件的目的
需要注意的是,在Java領域中,對於CAS操作實現,主要有兩點問題:
- JDK1.8版本之前,CAS操作主要使用Unsafe類來執行底層操作,一般併發和執行緒操作時,主要用compareAndSwapObject,compareAndSwapInt,compareAndSwapLong等來實現CAS,而對於執行緒排程主要是park和unpark方法,其主要在sun.misc包下面。
- JDK1.8版本之後,JDK1.9的CAS操作主要使用VarHandle類,只是用VarHandle替代了一部分Unsafe類的操作,但是對於新版本中Unsafe,本質上Unsafe類會間接呼叫jdk.internal.misc包下面Unsafe類來實現。
3. 具體實現
在Java領域中,AQS同步器利用獨享模式和共享模式來實現同步機制,主要為解決多線併發執行中資料競爭和競爭條件問題。
為解決多線併發執行中資料競爭和競爭條件問題,引入了同步機制,主要是通過控制共享資料和臨界區的訪問,一般比較通用的方式是通過鎖機制來實現。
在Java領域中,JDK對於AQS基礎同步器抽象封裝了鎖的獲取和釋放操作,主要提供了獨享和共享兩種工作模式:
- 獨享模式(Exclusive Mode) :對應著獨享鎖(Exclusive Lock),表示著對於鎖的獲取和釋放,一次只能至多一個或者只有一個執行緒把持,其他執行緒無法獲得並獲得持有,必須等待持有執行緒釋放鎖。
- 共享模式(Shared Mode) :對應著共享鎖(Shared Lock),表示著對於鎖的獲取和釋放,一次可以至少一個或者允許多個執行緒把持,其他執行緒可以獲得並獲得持有,不用等待持有執行緒釋放鎖。
其中,AQS基礎同步器對於獨享模式和共享模式的工作模式的基本流程,主要如下:
- 獲取鎖流程: 先嚐試獲取鎖,如果獲取成功則往下繼續進行,否則把執行緒維護到等待佇列中,執行緒可能會掛起。
- 釋放鎖流程:喚醒等待佇列中的一個或者多個執行緒去嘗試獲取需要釋放的鎖。
一般地,AQS基礎同步器對於獨享模式和共享模式的封裝和實現,其中:
3.1. 獨享模式的技術實現
[1].獲取鎖操作相關的核心邏輯,主要如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 獨佔模式:[1].通過acquire獲取鎖操作 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * 獨佔模式:[2].通過tryAcquire嘗試獲取鎖操作 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 獨佔模式:[3].通過addWaiter入隊操作 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * 獨佔模式:[4].通過acquireQueued檢測節點 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * 獨佔模式:[5].通過cancelAcquire取消鎖獲取 */ private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } } }
[2]. 釋放鎖操作相關的核心邏輯,主要如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 獨佔模式:[1].通過release釋放鎖操作 */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * 獨佔模式:[2].通過tryRelease嘗試釋放鎖操作 */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * 獨佔模式:[3].通過unparkSuccessor喚醒後繼節點 */ private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } }
由此可見,對於獨佔模式的鎖獲取和釋放,主要是依據acquire和release等方法來實現。
3.1. 共享模式的技術實現
[1].獲取鎖操作相關的核心邏輯,主要如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 共享模式:[1].通過acquireShared獲取鎖操作 */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * 共享模式:[2].通過tryAcquireShared嘗試獲取鎖操作 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /** * 共享模式:[3].通過doAcquireShared入隊操作 */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } }
[2].釋放鎖操作相關的核心邏輯,主要如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 共享模式:[1].通過releaseShared釋放鎖操作 */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } /** * 共享模式:[2].通過tryReleaseShared嘗試釋放鎖操作 */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /** * 共享模式:[3].通過doReleaseShared釋放鎖就緒操作 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } }
由此可見,對於共享模式的鎖獲取和釋放,主要是依據acquireShared和releaseShared等方法來實現。
綜上所述,不難看出,AQS同步器設計思想是通過繼承的方式提供一個模板,其核心原理是管理一個共享狀態,通過對狀態的控制來實現不同的控制。
二. LockSupport的設計與實現
在Java領域中,LockSupport主要從執行緒資源角度為同步器和鎖提供基本執行緒阻塞和喚醒原語,是“等待-通知”工作機制的實現。
一般來說,當一個執行緒(Thread)只要參與鎖競爭時,其經歷的主要流程有:
- 一旦當前執行緒進行鎖競爭時,執行緒都會嘗試獲取鎖,根據獲取鎖的情況進行後續處理。
- 如果獲取鎖失敗,則會建立節點插入到佇列的尾部,會二次嘗試重新獲取鎖,並不會阻塞當前執行緒。
- 如果獲取鎖成功,則直接返回,否則會將節點設定為待執行狀態(SIGNAL)。
- 最後對當前執行緒進行阻塞,當前驅節點執行完成後會喚醒後繼節點。
在Java領域中,對於執行緒的阻塞和喚醒,也許我們最早在學習面向物件原則的時候,一般都使用java.lang.Object類中wait()、notify()、notifyAll() 這三個方法,可以用它們幫助我們實現等待-通知”工作機制。
同時,在講解AQS基礎同步器的實現時,提到說CAS操作的核心是使用Unsafe類來執行底層操作,對於執行緒排程主要是park和unpark方法,但是一般的Java語言層 main開發對其呼叫又有安全檢查的限制。
但是,在AQS基礎同步的的阻塞和喚醒操作咋在獲取鎖餓的鎖操作中需要使用,一般地:
- 如果獲取不到鎖的當前執行緒在進入排到佇列之後需要阻塞當前執行緒。
- 並且,排到佇列中前驅節點執行完成後,需要負責喚醒後繼節點。
於是,在AQS基礎同步器的設計與實現中,封裝一個專門用於實現“等待-通知”工作機制的LockSupport類。
對於LockSupport類,主要是為同步器和鎖提供基本執行緒阻塞和喚醒原語,AQS同步器和鎖都是使用它來阻塞和喚醒執行緒。主要原始碼如下:
public class LockSupport { /** * 阻塞操作:利用park阻塞某個執行緒(指定引數) */ public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0 L); setBlocker(t, null); } /** * 阻塞操作:利用park阻塞某個執行緒(無指定引數) */ public static void park() { UNSAFE.park(false, 0 L); } /** * 阻塞操作:根據nanos許可,利用parkNanos阻塞某個執行緒 */ public static void parkNanos(long nanos) { if (nanos > 0) UNSAFE.park(false, nanos); } /** * 阻塞操作:根據nanos許可,利用parkNanos阻塞某個執行緒,但是指定阻塞物件 */ public static void parkNanos(Object blocker, long nanos) { if (nanos > 0) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, nanos); setBlocker(t, null); } } /** * 阻塞操作:根據deadline最大等待時間,利用parkUntil阻塞某個執行緒 */ public static void parkUntil(long deadline) { UNSAFE.park(true, deadline); } /** * 阻塞操作:根據deadline最大等待時間,利用parkUntil阻塞某個執行緒,需要指定阻塞物件 */ public static void parkUntil(Object blocker, long deadline) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(true, deadline); setBlocker(t, null); } /** * 喚醒操作:利用unpark喚醒某個執行緒 */ public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } /** * 設定阻塞器: 指定執行緒和物件 */ private static void setBlocker(Thread t, Object arg) { // Even though volatile, hotspot doesn't need a write barrier here. UNSAFE.putObject(t, parkBlockerOffset, arg); } /** * 獲取阻塞器中執行緒物件 */ public static Object getBlocker(Thread t) { if (t == null) throw new NullPointerException(); return UNSAFE.getObjectVolatile(t, parkBlockerOffset); } /** * 獲取阻塞器中執行緒物件 */ static final int nextSecondarySeed() { int r; Thread t = Thread.currentThread(); if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) { r ^= r << 13; // xorshift r ^= r >>> 17; r ^= r << 5; } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) r = 1; // avoid zero UNSAFE.putInt(t, SECONDARY, r); return r; } // Hotspot implementation via intrinsics API private static final sun.misc.Unsafe UNSAFE; private static final long parkBlockerOffset; private static final long SEED; private static final long PROBE; private static final long SECONDARY; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class <? > tk = Thread.class; parkBlockerOffset = UNSAFE.objectFieldOffset(tk.getDeclaredField("parkBlocker")); SEED = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSeed")); PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe")); SECONDARY = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSecondarySeed")); } catch (Exception ex) { throw new Error(ex); } } }
- 設計思想: 相比用java.lang.Object類中的wait/notify方式,其LockSupport類更關注執行緒本身,解耦了執行緒之間的同步。
- 實現原理: 主要還是使用Unsafe類來執行底層操作,主要是間接呼叫是park和unpark本地方法
- 阻塞操作涉及方法: 一般以park開頭的方法來阻塞執行緒操作,大致可以分為自定義阻塞物件引數和非自定義阻塞物件引數等阻塞方法
- 喚醒執行緒操作: 主要通過unpark方法來對當前執行緒設定可用,相對於喚醒操作
三. Condition介面的設計與實現
在Java領域中,Condition介面是用來實現管程技術,其中 Condition用於解決同步問題。
相對於LockSupport的設計與實現來說,Condition介面只是在JDK層面對於阻塞和喚醒提供了一個模板的定義,是AQS基礎同步器中條件佇列的定義,而ConditionObject是在AQS基礎同步器具體實現。
對於Condition介面而言,是提供了可替代wait/notify機制的條件佇列模式,其中:
public interface Condition { /** * 條件佇列模式:等待await操作 */ void await() throws InterruptedException; /** * 條件佇列模式:等待awaitUninterruptibly操作,可中斷模式 */ void awaitUninterruptibly(); /** * 條件佇列模式:等待awaitNanos操作,可超時模式 */ long awaitNanos(long nanosTimeout) throws InterruptedException; /** * 條件佇列模式:等待await操作,可超時模式 */ boolean await(long time, TimeUnit unit) throws InterruptedException; /** *條件佇列模式:等待awaitUntil操作,可超時模式 */ boolean awaitUntil(Date deadline) throws InterruptedException; /** * 條件佇列模式:通知signal操作 */ void signal(); /** * 條件佇列模式:通知signalAll操作 */ void signalAll(); }
- 定義了關於“等待(wait)”機制的相關實現:而以await開頭的所有方法都是關於等待機制的定義。
- 定義了關於“通知(signal)”機制的相關實現: signal()方法和signalAll()方法,其中 signal()方法是隨機地通知等待佇列中的一個執行緒,而signalAll()方法是通知等待佇列中的所有執行緒。
四. Lock介面的設計與實現
在Java領域中,Lock介面是用來實現管程技術,其中 Lock 用於解決互斥問題。
Lock介面位於java.util.concurrent.locks包中,是JUC顯式鎖的一個抽象,Lock介面的主要抽象方法:
public interface Lock { /** * Lock介面-獲取鎖 */ void lock(); /** * Lock介面-獲取鎖(可中斷) */ void lockInterruptibly() throws InterruptedException; /** * Lock介面-嘗試獲取鎖 * */ boolean tryLock(); /** *Lock介面-嘗試獲取鎖(支援超時) */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** *Lock介面-釋放鎖 * */ void unlock(); /** * Lock介面-設定條件變數 */ Condition newCondition(); }
- 獲取鎖: lock()
- 釋放鎖:unlock()
- 條件變數: Condition
在JDK中,對於Lock介面的具體實現主要是ReentrantLock類,其中:
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699 L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * 構造鎖的非公平模式(預設模式) */ public ReentrantLock() { sync = new NonfairSync(); } /** * 構造鎖的公平和非公平模式(可選公平或者非公平) */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * Lock介面-實現嘗試獲取鎖 */ public void lock() { sync.lock(); } /** * Lock介面-實現嘗試獲取鎖 */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * Lock介面-實現嘗試獲取鎖 */ public boolean tryLock() { return sync.nonfairTryAcquire(1); } /** * Lock介面-實現嘗試獲取鎖 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } /** * Lock介面-釋放鎖 */ public void unlock() { sync.release(1); } /** * Lock介面-建立條件變數 */ public Condition newCondition() { return sync.newCondition(); } }
- 包含了一個同步器Sync,主要是基於AbstractQueuedSynchronizer實現,同時基於Sync類還實現FairSync類和NonfairSync類,其中FairSync類對應著公平模式,NonfairSync類對應非公平模式。
- 實現Lock介面設計和定義的相關方法,可以設定其鎖是公平和非公的,預設是非公平模式的。
相對於Java內建鎖,Java SDK 併發包裡的 Lock介面主要區別有能夠響應中斷、支援超時和非阻塞地獲取鎖等三個特性。
五. ReadWriteLock介面的設計與實現
在Java領域中,ReadWriteLock介面主要是基於Lock介面來封裝了ReadLock鎖和WriteLock鎖等2種鎖的實現方法,其中ReadLock鎖是讀鎖的介面定義,WriteLock鎖是寫鎖的介面定義。
ReadWriteLock介面的內部實現,主要是基於Lock介面來獲取ReadLock鎖和WriteLock鎖的,其具體程式碼如下:
public interface ReadWriteLock { /** * ReadWriteLock介面-基於Lock介面實現ReadLock */ Lock readLock(); /** * ReadWriteLock介面-基於Lock介面實現WriteLock */ Lock writeLock(); }
- readLock()方法: 獲取讀鎖,主要是基於基於Lock介面來定義,表示著其具體實現類都會基於AQS基礎同步器實現
- writeLock()方法:獲取寫鎖,主要是基於基於Lock介面來定義,表示著其具體實現類都會基於AQS基礎同步器實現
在JDK中,對於ReadWriteLock介面的具體實現主要是ReentrantReadWriteLock類,其中:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164 L; /** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; /** * 同步器-基於AbstractQueuedSynchronizer實現 */ abstract static class Sync extends AbstractQueuedSynchronizer { //..... abstract boolean readerShouldBlock(); abstract boolean writerShouldBlock(); } /** * 同步器-非公平模式 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037 L; final boolean writerShouldBlock() { return false; } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } } /** * 同步器-公平模式 */ static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451 L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } } /** * 構造鎖的非公平模式(預設預設) */ public ReentrantReadWriteLock() { this(false); } /** * 構造鎖的公平和非公平模式(可選公平或者非公平) */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } /** * ReadWriteLock介面-基於Lock具體實現的writeLock() */ public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } /** * ReadWriteLock介面-基於Lock具體實現的readLock() */ public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } /** * ReadWriteLock介面-ReadLock內建類 */ public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164 L; private final Sync sync; //..... } /** * ReadWriteLock介面-WriteLock內建類 */ public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164 L; private final Sync sync; //..... } }
- 包含了一個同步器Sync,主要是基於AbstractQueuedSynchronizer實現,同時基於Sync類還實現FairSync類和NonfairSync類,其中FairSync類對應著公平模式,NonfairSync類對應非公平模式。
- 主要是內建定義了ReadLock類和WriteLock類等兩個內部類,其中ReadLock類為讀鎖,而而WriteLock類為寫鎖,可以設定其鎖是公平和非公的,預設是非公平模式的。
- 實現了ReadWriteLock介面,通過內部類的定義來具體實現對應的鎖,其中ReadLock類對應readLock()方法,而WriteLock類對應writeLock()方法。
綜上所述,從JDK中對於ReadWriteLock介面的具體實現來看,我們不難發現,ReadWriteLock介面是實現讀寫鎖一種模板定義,我們可以基於這個介面來實現滿足我們實際業務需求的讀寫鎖。
寫在最後
在併發程式設計領域,有兩大核心問題:一個是互斥,即同一時刻只允許一個執行緒訪問共享資源;另一個是同步,即執行緒之間如何通訊、協作。
主要原因是,對於多執行緒實現實現併發,一直以來,多執行緒都存在2個問題:
- 執行緒之間記憶體共享,需要通過加鎖進行控制,但是加鎖會導致效能下降,同時複雜的加鎖機制也會增加程式設計編碼難度
- 過多執行緒造成執行緒之間的上下文切換,導致效率低下
因此,在併發程式設計領域中,一直有一個很重要的設計原則: “ 不要通過記憶體共享來實現通訊,而應該通過通訊來實現記憶體共享。”
簡單來說,就是儘可能通過訊息通訊,而不是記憶體共享來實現程序或者執行緒之間的同步。
最後,技術研究之路任重而道遠,願我們熬的每一個通宵,都撐得起我們想在這條路上走下去的勇氣,未來仍然可期,與各位程式程式設計君共勉!
版權宣告:本文為博主原創文章,遵循相關版權協議,如若轉載或者分享請附上原文出處連結和連結來源。
- 天翼雲全場景業務無縫替換至國產原生作業系統CTyunOS!
- 以羊了個羊為例,淺談小程式抓包與響應報文修改
- 這幾種常見的 JVM 調優場景,你知道嗎?
- 如此狂妄,自稱高效能佇列的Disruptor有啥來頭?
- 為什麼要學習GoF設計模式?
- 827. 最大人工島 : 簡單「並查集 列舉」運用題
- 手把手教你如何使用 Timestream 實現物聯網時序資料儲存和分析
- 850. 矩形面積 II : 掃描線模板題
- Java 併發程式設計解析 | 基於JDK原始碼解析Java領域中的併發鎖,我們可以從中學習到什麼內容?
- 【手把手】光說不練假把式,這篇全鏈路壓測實踐探索
- 大廠鍾愛的全鏈路壓測有什麼意義?四種壓測方案詳細對比分析
- 寫個續集,填坑來了!關於“Thread.sleep(0)這一行‘看似無用’的程式碼”裡面留下的坑。
- 857. 僱傭 K 名工人的最低成本 : 列舉 優先佇列(堆)運用題
- Vue3 實現一個自定義toast(小彈窗)
- 669. 修剪二叉搜尋樹 : 常規樹的遍歷與二叉樹性質
- 讀完 RocketMQ 原始碼,我學會了如何優雅的建立執行緒
- 效能調優——小小的log大大的坑
- 1582. 二進位制矩陣中的特殊位置 : 簡單模擬題
- elementui原始碼學習之仿寫一個el-switch
- 646. 最長數對鏈 : 常規貪心 DP 運用題