Java 併發程式設計(六)之 CAS, Unsafe和原子類詳解

語言: CN / TW / HK

帶著BAT大廠的面試問題去理解

請帶著這些問題繼續後文,會很大程度上幫助你更好的理解相關知識點。

  • 執行緒安全的實現方法有哪些?
  • 什麼是CAS?
  • CAS使用示例,結合AtomicInteger給出示例?
  • CAS會有哪些問題?
  • 針對這這些問題,Java提供了哪幾個解決的?
  • AtomicInteger底層實現? CAS+volatile
  • 請闡述你對Unsafe類的理解?
  • 說說你對Java原子類的理解? 包含13個,4組分類,說說作用和使用場景。
  • AtomicStampedReference是什麼?
  • AtomicStampedReference是怎麼解決ABA的? 內部使用Pair來儲存元素值及其版本號
  • java中還有哪些類可以解決ABA的問題? AtomicMarkableReference

CAS

前面我們說到,執行緒安全的實現方法包含:

  • 互斥同步: synchronized 和 ReentrantLock
  • 非阻塞同步: CAS, AtomicXXXX
  • 無同步方案: 棧封閉,Thread Local,可重入程式碼

具體可以參看:執行緒安全的實現方法,這裡我們將對CAS重點闡釋。

什麼是CAS

CAS的全稱為Compare-And-Swap,直譯就是對比交換。是一條CPU的原子指令,其作用是讓CPU先進行比較兩個值是否相等,然後原子地更新某個位置的值,經過調查發現,其實現方式是基於硬體平臺的彙編指令,就是說CAS是靠硬體實現的,JVM只是封裝了彙編呼叫,那些AtomicInteger類便是使用了這些封裝後的介面。   簡單解釋:CAS操作需要輸入兩個數值,一箇舊值(期望操作前的值)和一個新值,在操作期間先比較下在舊值有沒有發生變化,如果沒有發生變化,才交換成新值,發生了變化則不交換。

CAS操作是原子性的,所以多執行緒併發使用CAS更新資料時,可以不使用鎖。JDK中大量使用了CAS來更新資料而防止加鎖(synchronized 重量級鎖)來保持原子更新。

相信sql大家都熟悉,類似sql中的條件更新一樣:update set id=3 from table where id=2。因為單條sql執行具有原子性,如果有多個執行緒同時執行此sql語句,只有一條能更新成功。

CAS使用示例

如果不使用CAS,在高併發下,多執行緒同時修改一個變數的值我們需要synchronized加鎖(可能有人說可以用Lock加鎖,Lock底層的AQS也是基於CAS進行獲取鎖的)。

public class Test { private int i=0; public synchronized int add(){ return i++; } }

java中為我們提供了AtomicInteger 原子類(底層基於CAS進行更新資料的),不需要加鎖就在多執行緒併發場景下實現資料的一致性。

public class Test { private AtomicInteger i = new AtomicInteger(0); public int add(){ return i.addAndGet(1); } }

CAS 問題

CAS 方式為樂觀鎖,synchronized 為悲觀鎖。因此使用 CAS 解決併發問題通常情況下效能更優。

但使用 CAS 方式也會有幾個問題:

ABA問題

因為CAS需要在操作值的時候,檢查值有沒有發生變化,比如沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時則會發現它的值沒有發生變化,但是實際上卻變化了。

ABA問題的解決思路就是使用版本號。在變數前面追加上版本號,每次變數更新的時候把版本號加1,那麼A->B->A就會變成1A->2B->3A。

從Java 1.5開始,JDK的Atomic包裡提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法的作用是首先檢查當前引用是否等於預期引用,並且檢查當前標誌是否等於預期標誌,如果全部相等,則以原子方式將該引用和該標誌的值設定為給定的更新值。

迴圈時間長開銷大

自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。如果JVM能支援處理器提供的pause指令,那麼效率會有一定的提升。pause指令有兩個作用:第一,它可以延遲流水線執行命令(de-pipeline),使CPU不會消耗過多的執行資源,延遲的時間取決於具體實現的版本,在一些處理器上延遲時間是零;第二,它可以避免在退出迴圈的時候因記憶體順序衝突(Memory Order Violation)而引起CPU流水線被清空(CPU Pipeline Flush),從而提高CPU的執行效率。

只能保證一個共享變數的原子操作

當對一個共享變數執行操作時,我們可以使用迴圈CAS的方式來保證原子操作,但是對多個共享變數操作時,迴圈CAS就無法保證操作的原子性,這個時候就可以用鎖。

還有一個取巧的辦法,就是把多個共享變數合併成一個共享變數來操作。比如,有兩個共享變數i = 2,j = a,合併一下ij = 2a,然後用CAS來操作ij。

從Java 1.5開始,JDK提供了AtomicReference類來保證引用物件之間的原子性,就可以把多個變數放在一個物件裡來進行CAS操作。

UnSafe類詳解

上文我們瞭解到Java原子類是通過UnSafe類實現的,這節主要分析下UnSafe類。UnSafe類在J.U.C中CAS操作有很廣泛的應用。

Unsafe是位於sun.misc包下的一個類,主要提供一些用於執行低級別、不安全操作的方法,如直接訪問系統記憶體資源、自主管理記憶體資源等,這些方法在提升Java執行效率、增強Java語言底層資源操作能力方面起到了很大的作用。但由於Unsafe類使Java語言擁有了類似C語言指標一樣操作記憶體空間的能力,這無疑也增加了程式發生相關指標問題的風險。在程式中過度、不正確使用Unsafe類會使得程式出錯的概率變大,使得Java這種安全的語言變得不再“安全”,因此對Unsafe的使用一定要慎重。

這個類儘管裡面的方法都是 public 的,但是並沒有辦法使用它們,JDK API 文件也沒有提供任何關於這個類的方法的解釋。總而言之,對於 Unsafe 類的使用都是受限制的,只有授信的程式碼才能獲得該類的例項,當然 JDK 庫裡面的類是可以隨意使用的。

先來看下這張圖,對UnSafe類總體功能:

圖片.png

如上圖所示,Unsafe提供的API大致可分為記憶體操作、CAS、Class相關、物件操作、執行緒排程、系統資訊獲取、記憶體屏障、陣列操作等幾類,下面將對其相關方法和應用場景進行詳細介紹。

Unsafe與CAS

反編譯出來的程式碼:

``` public final int getAndAddInt(Object paramObject, long paramLong, int paramInt) { int i; do i = getIntVolatile(paramObject, paramLong); while (!compareAndSwapInt(paramObject, paramLong, i, i + paramInt)); return i; }

public final long getAndAddLong(Object paramObject, long paramLong1, long paramLong2) { long l; do l = getLongVolatile(paramObject, paramLong1); while (!compareAndSwapLong(paramObject, paramLong1, l, l + paramLong2)); return l; }

public final int getAndSetInt(Object paramObject, long paramLong, int paramInt) { int i; do i = getIntVolatile(paramObject, paramLong); while (!compareAndSwapInt(paramObject, paramLong, i, paramInt)); return i; }

public final long getAndSetLong(Object paramObject, long paramLong1, long paramLong2) { long l; do l = getLongVolatile(paramObject, paramLong1); while (!compareAndSwapLong(paramObject, paramLong1, l, paramLong2)); return l; }

public final Object getAndSetObject(Object paramObject1, long paramLong, Object paramObject2) { Object localObject; do localObject = getObjectVolatile(paramObject1, paramLong); while (!compareAndSwapObject(paramObject1, paramLong, localObject, paramObject2)); return localObject; } ```

從原始碼中發現,內部使用自旋的方式進行CAS更新(while迴圈進行CAS更新,如果更新失敗,則迴圈再次重試)。

又從Unsafe類中發現,原子操作其實只支援下面三個方法。

``` public final native boolean compareAndSwapObject(Object paramObject1, long paramLong, Object paramObject2, Object paramObject3);

public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);

public final native boolean compareAndSwapLong(Object paramObject, long paramLong1, long paramLong2, long paramLong3); ```

我們發現Unsafe只提供了3種CAS方法:compareAndSwapObject、compareAndSwapInt和compareAndSwapLong。都是native方法。

Unsafe底層

不妨再看看Unsafe的compareAndSwap*方法來實現CAS操作,它是一個本地方法,實現位於unsafe.cpp中。

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END

可以看到它通過 Atomic::cmpxchg 來實現比較和替換操作。其中引數x是即將更新的值,引數e是原記憶體的值。

如果是Linux的x86,Atomic::cmpxchg方法的實現如下:

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; }

而windows的x86的實現如下:

``` inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::isMP(); //判斷是否是多處理器 _asm { mov edx, dest mov ecx, exchange_value mov eax, compare_value LOCK_IF_MP(mp) cmpxchg dword ptr [edx], ecx } }

// Adding a lock prefix to an instruction on MP machine // VC++ doesn't like the lock prefix to be on a single line // so we can't insert a label after the lock prefix. // By emitting a lock prefix, we can define a label after it. #define LOCK_IF_MP(mp) __asm cmp mp, 0 \ __asm je L0 \ __asm _emit 0xF0 \ __asm L0: ```

如果是多處理器,為cmpxchg指令新增lock字首。反之,就省略lock字首(單處理器會不需要lock字首提供的記憶體屏障效果)。這裡的lock字首就是使用了處理器的匯流排鎖(最新的處理器都使用快取鎖代替匯流排鎖來提高效能)。

cmpxchg(void* ptr, int old, int new),如果ptr和old的值一樣,則把new寫到ptr記憶體,否則返回ptr的值,整個操作是原子的。在Intel平臺下,會用lock cmpxchg來實現,使用lock觸發快取鎖,這樣另一個執行緒想訪問ptr的記憶體,就會被block住。

Unsafe其它功能

Unsafe 提供了硬體級別的操作,比如說獲取某個屬性在記憶體中的位置,比如說修改物件的欄位值,即使它是私有的。不過 Java 本身就是為了遮蔽底層的差異,對於一般的開發而言也很少會有這樣的需求。

舉兩個例子,比方說:

public native long staticFieldOffset(Field paramField);

這個方法可以用來獲取給定的 paramField 的記憶體地址偏移量,這個值對於給定的 field 是唯一的且是固定不變的。

再比如說:

public native int arrayBaseOffset(Class paramClass); public native int arrayIndexScale(Class paramClass);

前一個方法是用來獲取陣列第一個元素的偏移地址,後一個方法是用來獲取陣列的轉換因子即陣列中元素的增量地址的。

最後看三個方法:

public native long allocateMemory(long paramLong); public native long reallocateMemory(long paramLong1, long paramLong2); public native void freeMemory(long paramLong);

分別用來分配記憶體,擴充記憶體和釋放記憶體的。

更多相關功能,推薦你看下這篇文章:來自美團技術團隊:Java魔法類:Unsafe應用解析在新視窗開啟

AtomicInteger

使用舉例

以 AtomicInteger 為例,常用 API:

public final int get():獲取當前的值 public final int getAndSet(int newValue):獲取當前的值,並設定新的值 public final int getAndIncrement():獲取當前的值,並自增 public final int getAndDecrement():獲取當前的值,並自減 public final int getAndAdd(int delta):獲取當前的值,並加上預期的值 void lazySet(int newValue): 最終會設定成newValue,使用lazySet設定值後,可能導致其他執行緒在之後的一小段時間內還是可以讀到舊的值。

相比 Integer 的優勢,多執行緒中讓變數自增:

private volatile int count = 0; // 若要執行緒安全執行執行 count++,需要加鎖 public synchronized void increment() { count++; } public int getCount() { return count; }

使用 AtomicInteger 後:

private AtomicInteger count = new AtomicInteger(); public void increment() { count.incrementAndGet(); } // 使用 AtomicInteger 後,不需要加鎖,也可以實現執行緒安全 public int getCount() { return count.get(); }

原始碼解析

``` public class AtomicInteger extends Number implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { //用於獲取value欄位相對當前物件的“起始地址”的偏移量 valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } }

private volatile int value;

//返回當前值
public final int get() {
    return value;
}

//遞增加detla
public final int getAndAdd(int delta) {
    //三個引數,1、當前的例項 2、value例項變數的偏移量 3、當前value要加上的數(value+delta)。
    return unsafe.getAndAddInt(this, valueOffset, delta);
}

//遞增加1
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

... } ```

我們可以看到 AtomicInteger 底層用的是volatile的變數和CAS來進行更改資料的。

  • volatile保證執行緒的可見性,多執行緒併發時,一個執行緒修改資料,可以保證其它執行緒立馬看到修改後的值
  • CAS 保證資料更新的原子性。

延伸到所有原子類:共12個

JDK中提供了12個原子操作類。

原子更新基本型別

使用原子的方式更新基本型別,Atomic包提供了以下3個類。

  • AtomicBoolean: 原子更新布林型別。
  • AtomicInteger: 原子更新整型。
  • AtomicLong: 原子更新長整型。

以上3個類提供的方法幾乎一模一樣,可以參考上面AtomicInteger中的相關方法。

原子更新陣列

通過原子的方式更新數組裡的某個元素,Atomic包提供了以下的3個類:

  • AtomicIntegerArray: 原子更新整型數組裡的元素。
  • AtomicLongArray: 原子更新長整型數組裡的元素。
  • AtomicReferenceArray: 原子更新引用型別數組裡的元素。

這三個類的最常用的方法是如下兩個方法:

  • get(int index):獲取索引為index的元素值。
  • compareAndSet(int i,E expect,E update): 如果當前值等於預期值,則以原子方式將陣列位置i的元素設定為update值。

舉個AtomicIntegerArray例子:

``` import java.util.concurrent.atomic.AtomicIntegerArray;

public class Demo5 { public static void main(String[] args) throws InterruptedException { AtomicIntegerArray array = new AtomicIntegerArray(new int[] { 0, 0 }); System.out.println(array); System.out.println(array.getAndAdd(1, 2)); System.out.println(array); } } ```

輸出結果:

[0, 0] 0 [0, 2]

原子更新引用型別

Atomic包提供了以下三個類:

  • AtomicReference: 原子更新引用型別。
  • AtomicStampedReference: 原子更新引用型別, 內部使用Pair來儲存元素值及其版本號。
  • AtomicMarkableReferce: 原子更新帶有標記位的引用型別。

這三個類提供的方法都差不多,首先構造一個引用物件,然後把引用物件set進Atomic類,然後呼叫compareAndSet等一些方法去進行原子操作,原理都是基於Unsafe實現,但AtomicReferenceFieldUpdater略有不同,更新的欄位必須用volatile修飾。

舉個AtomicReference例子:

``` import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {

public static void main(String[] args){

    // 建立兩個Person物件,它們的id分別是101和102。
    Person p1 = new Person(101);
    Person p2 = new Person(102);
    // 新建AtomicReference物件,初始化它的值為p1物件
    AtomicReference ar = new AtomicReference(p1);
    // 通過CAS設定ar。如果ar的值為p1的話,則將其設定為p2。
    ar.compareAndSet(p1, p2);

    Person p3 = (Person)ar.get();
    System.out.println("p3 is "+p3);
    System.out.println("p3.equals(p1)="+p3.equals(p1));
}

}

class Person { volatile long id; public Person(long id) { this.id = id; } public String toString() { return "id:"+id; } } ```

結果輸出:

p3 is id:102 p3.equals(p1)=false

結果說明:

  • 新建AtomicReference物件ar時,將它初始化為p1。
  • 緊接著,通過CAS函式對它進行設定。如果ar的值為p1的話,則將其設定為p2。
  • 最後,獲取ar對應的物件,並列印結果。p3.equals(p1)的結果為false,這是因為Person並沒有覆蓋equals()方法,而是採用繼承自Object.java的equals()方法;而Object.java中的equals()實際上是呼叫"=="去比較兩個物件,即比較兩個物件的地址是否相等。

原子更新欄位類

Atomic包提供了四個類進行原子欄位更新:

  • AtomicIntegerFieldUpdater: 原子更新整型的欄位的更新器。
  • AtomicLongFieldUpdater: 原子更新長整型欄位的更新器。
  • AtomicReferenceFieldUpdater: 上面已經說過此處不在贅述。

這四個類的使用方式都差不多,是基於反射的原子更新欄位的值。要想原子地更新欄位類需要兩步:

  • 第一步,因為原子更新欄位類都是抽象類,每次使用的時候必須使用靜態方法newUpdater()建立一個更新器,並且需要設定想要更新的類和屬性。
  • 第二步,更新類的欄位必須使用public volatile修飾。

舉個例子:

``` public class TestAtomicIntegerFieldUpdater {

public static void main(String[] args){
    TestAtomicIntegerFieldUpdater tIA = new TestAtomicIntegerFieldUpdater();
    tIA.doIt();
}

public AtomicIntegerFieldUpdater<DataDemo> updater(String name){
    return AtomicIntegerFieldUpdater.newUpdater(DataDemo.class,name);

}

public void doIt(){
    DataDemo data = new DataDemo();
    System.out.println("publicVar = "+updater("publicVar").getAndAdd(data, 2));
    /*
        * 由於在DataDemo類中屬性value2/value3,在TestAtomicIntegerFieldUpdater中不能訪問
        * */
    //System.out.println("protectedVar = "+updater("protectedVar").getAndAdd(data,2));
    //System.out.println("privateVar = "+updater("privateVar").getAndAdd(data,2));

    //System.out.println("staticVar = "+updater("staticVar").getAndIncrement(data));//報java.lang.IllegalArgumentException
    /*
        * 下面報異常:must be integer
        * */
    //System.out.println("integerVar = "+updater("integerVar").getAndIncrement(data));
    //System.out.println("longVar = "+updater("longVar").getAndIncrement(data));
}

}

class DataDemo{ public volatile int publicVar=3; protected volatile int protectedVar=4; private volatile int privateVar=5;

public volatile static int staticVar = 10;
//public  final int finalVar = 11;

public volatile Integer integerVar = 19;
public volatile Long longVar = 18L;

} ```

再說下對於AtomicIntegerFieldUpdater 的使用稍微有一些限制和約束,約束如下:

  • 欄位必須是volatile型別的,線上程之間共享變數時保證立即可見.eg:volatile int value = 3
  • 欄位的描述型別(修飾符public/protected/default/private)是與呼叫者與操作物件欄位的關係一致。也就是說呼叫者能夠直接操作物件欄位,那麼就可以反射進行原子操作。但是對於父類的欄位,子類是不能直接操作的,儘管子類可以訪問父類的欄位。
  • 只能是例項變數,不能是類變數,也就是說不能加static關鍵字。
  • 只能是可修改變數,不能使final變數,因為final的語義就是不可修改。實際上final的語義和volatile是有衝突的,這兩個關鍵字不能同時存在。
  • 對於AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long型別的欄位,不能修改其包裝型別(Integer/Long)。如果要修改包裝型別就需要使用AtomicReferenceFieldUpdater。

再講講AtomicStampedReference解決CAS的ABA問題

AtomicStampedReference解決ABA問題

AtomicStampedReference主要維護包含一個物件引用以及一個可以自動更新的整數"stamp"的pair物件來解決ABA問題。

``` public class AtomicStampedReference { private static class Pair { final T reference; //維護物件引用 final int stamp; //用於標誌版本 private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static Pair of(T reference, int stamp) { return new Pair(reference, stamp); } } private volatile Pair pair; ....

/**
  * expectedReference :更新之前的原始值
  * newReference : 將要更新的新值
  * expectedStamp : 期待更新的標誌版本
  * newStamp : 將要更新的標誌版本
  */
public boolean compareAndSet(V   expectedReference,
                         V   newReference,
                         int expectedStamp,
                         int newStamp) {
    // 獲取當前的(元素值,版本號)對
    Pair<V> current = pair;
    return
        // 引用沒變
        expectedReference == current.reference &&
        // 版本號沒變
        expectedStamp == current.stamp &&
        // 新引用等於舊引用
        ((newReference == current.reference &&
        // 新版本號等於舊版本號
        newStamp == current.stamp) ||
        // 構造新的Pair物件並CAS更新
        casPair(current, Pair.of(newReference, newStamp)));
}

private boolean casPair(Pair<V> cmp, Pair<V> val) {
    // 呼叫Unsafe的compareAndSwapObject()方法CAS更新pair的引用為新引用
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

```

  • 如果元素值和版本號都沒有變化,並且和新的也相同,返回true;
  • 如果元素值和版本號都沒有變化,並且和新的不完全相同,就構造一個新的Pair物件並執行CAS更新pair。

可以看到,java中的實現跟我們上面講的ABA的解決方法是一致的。

  • 首先,使用版本號控制;
  • 其次,不重複使用節點(Pair)的引用,每次都新建一個新的Pair來作為CAS比較的物件,而不是複用舊的;
  • 最後,外部傳入元素值及版本號,而不是節點(Pair)的引用。

使用舉例

``` private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference<>(1, 0); public static void main(String[] args){ Thread main = new Thread(() -> { System.out.println("操作執行緒" + Thread.currentThread() +",初始值 a = " + atomicStampedRef.getReference()); int stamp = atomicStampedRef.getStamp(); //獲取當前標識別 try { Thread.sleep(1000); //等待1秒 ,以便讓干擾執行緒執行 } catch (InterruptedException e) { e.printStackTrace(); } boolean isCASSuccess = atomicStampedRef.compareAndSet(1,2,stamp,stamp +1); //此時expectedReference未發生改變,但是stamp已經被修改了,所以CAS失敗 System.out.println("操作執行緒" + Thread.currentThread() +",CAS操作結果: " + isCASSuccess); },"主操作執行緒");

Thread other = new Thread(() -> {
    Thread.yield(); // 確保thread-main 優先執行

atomicStampedRef.compareAndSet(1,2,atomicStampedRef.getStamp(),atomicStampedRef.getStamp() +1); System.out.println("操作執行緒" + Thread.currentThread() +",【increment】 ,值 = "+ atomicStampedRef.getReference()); atomicStampedRef.compareAndSet(2,1,atomicStampedRef.getStamp(),atomicStampedRef.getStamp() +1); System.out.println("操作執行緒" + Thread.currentThread() +",【decrement】 ,值 = "+ atomicStampedRef.getReference()); },"干擾執行緒");

main.start();
other.start();

} ```

輸出結果:

``` // 輸出

操作執行緒Thread[主操作執行緒,5,main],初始值 a = 2 操作執行緒Thread[干擾執行緒,5,main],【increment】 ,值 = 2 操作執行緒Thread[干擾執行緒,5,main],【decrement】 ,值 = 1 操作執行緒Thread[主操作執行緒,5,main],CAS操作結果: false ```

java中還有哪些類可以解決ABA的問題?

AtomicMarkableReference,它不是維護一個版本號,而是維護一個boolean型別的標記,標記值有修改,瞭解一下。