Unsafe 魔法類應用體現 (LockSupport)

語言: CN / TW / HK

政採雲技術團隊.png

作者-斜照

前言

​ 隨著科技的進步,計算機硬體和算力也迎來跨越式的發展,從單核 CPU 序列執行計算任務到如今多核 CPU 並行執行任務。

為了充分利用多核資源,併發程式設計技術應時代出現。

​ 併發程式設計在提高程式執行效率的同時,也帶來了技術上的挑戰,比如多執行緒之間如何通訊,常見的解決方案是通過管道或者共享記憶體。Java 作為比較流行的程式語言,自然也是要順應併發的潮流,Java 採用共享記憶體方式保障多執行緒之間的通訊。

​ 使用共享記憶體通訊,也帶來了併發的相關問題:可見性、原子性、有序性。

​ 其中原子性則是由執行緒切換導致的併發問題,執行緒如何排程也是一門精細活,本文則從執行緒阻塞和喚醒的排程角度,來探討和揭開一部分 Unsafe 魔法類的面紗。

阻塞和喚醒

​ 藝術來源於生活,一些技術上的解決方案也可以體現在日常生活中。

​ 如社群醫院的就診過程,患者取號->醫生叫號->診斷->檢查室檢查->拿到結果複診->診斷結束-叫下一位,把醫生和患者看做執行緒和任務,則等待患者和診斷則是阻塞和喚醒的場景。

​ 基於該場景,做一下程式碼實現:

```java public class test07 { public static void main(String[] args) { List list = Arrays.asList( new Patient(Check.BRAIN.getType()), new Patient(Check.EYES.getType()), new Patient(Check.OTHER.getType()) ); ExecutorService executorService = Executors.newCachedThreadPool(); for (Patient patient : list) { executorService.submit(new Doctor(patient)); executorService.submit(new SpecialExamination(patient)); } executorService.shutdown(); System.out.println("ok"); } }

class Doctor implements Runnable {

private Patient patient;

public Doctor(Patient patient) {
    this.patient = patient;
}

public void run() {
    /**
     * 1.檢查是否掛號
     * 2.是否需要檢查原因
     * 3.處理其他患者
     * */
    synchronized (this.patient) {
        while (null == patient.getReason()) {
            System.out.println("醫生" + Thread.currentThread().getName() + "單號" + patient.getNumber() + "需要進行檢查");
            try {
                patient.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("醫生" + Thread.currentThread().getName() + "開始治療單號" + patient.getNumber() + "型別" + patient.getType() + "原因" + patient.getReason());
        return;
    }
}

}

class SpecialExamination implements Runnable {

private Patient patient;

public SpecialExamination(Patient patient) {
    this.patient = patient;
}

public void run() {
    synchronized (patient) {
        if (patient.getType() == Check.BRAIN.getType()) {
            patient.setReason("腦子壞了");
        } else if (patient.getType() == Check.EYES.getType()) {
            patient.setReason("頭髮沒了");
        } else if (patient.getType() == Check.OTHER.getType()) {
            patient.setReason("其他原因");
        }
        System.out.println("通知單號" + patient.getNumber() + "可以開始會診");
        patient.notifyAll();
    }
}

}

enum Check { BRAIN(1), EYES(2), OTHER(3);

public int getType() {
    return type;
}

private int type;

Check(int type) {
    this.type = type;
}

}

class Patient { private String number; private int type; private String reason;

public Patient(int type) {
    this.number = UUID.randomUUID().toString();
    this.type = type;
}

public String getNumber() {
    return number;
}

public void setNumber(String number) {
    this.number = number;
}

public int getType() {
    return type;
}

public void setType(int type) {
    this.type = type;
}

public String getReason() {
    return reason;
}

public void setReason(String reason) {
    this.reason = reason;
}

}

執行結果: 醫生pool-1-thread-1單號284a4fe2-4692-47b7-8466-44b36a1d6265需要進行檢查 通知單號284a4fe2-4692-47b7-8466-44b36a1d6265已經拿到檢查結果可以開始會診 醫生pool-1-thread-1開始治療單號284a4fe2-4692-47b7-8466-44b36a1d6265型別1原因腦子壞了 醫生pool-1-thread-3單號76afe44b-2bb2-40e9-9201-ef4d24a95619需要進行檢查 通知單號76afe44b-2bb2-40e9-9201-ef4d24a95619已經拿到檢查結果可以開始會診 醫生pool-1-thread-3開始治療單號76afe44b-2bb2-40e9-9201-ef4d24a95619型別2原因眼鏡壞了 醫生pool-1-thread-5單號c98b41de-6ab8-41a7-8c11-9838eeacd73e需要進行檢查 通知單號c98b41de-6ab8-41a7-8c11-9838eeacd73e已經拿到檢查結果可以開始會診 分診結束 醫生pool-1-thread-5開始治療單號c98b41de-6ab8-41a7-8c11-9838eeacd73e型別3原因其他原因 ```

​ 由上程式碼輸出可以看到每個 "醫生" 都讓 "患者" 去做檢查,檢查完畢後再由當前 "醫生" 進行會診。

​ "醫生" 執行緒接待任務,然後任務流轉到檢查執行緒,"醫生" 執行緒阻塞等待檢查執行緒執行完畢後被喚醒執行剩下的事情。

​ 上面的實現藉助 wait & notify 來實現阻塞和喚醒,如果喚醒執行緒先執行,阻塞執行緒後執行,那麼被掛起的執行緒將永遠無法被喚醒,無法達到預期結果,所以需要引入鎖來保障執行順序。

​ 如果不借助 wait & notify 和鎖能實現阻塞和喚醒嗎?當然可以,下面看一下使用 LockSupport 如何達成效果吧。

```java public class test07 { public static void main(String[] args) { List list = Arrays.asList( new Patient(Check.BRAIN.getType()), new Patient(Check.EYES.getType()), new Patient(Check.OTHER.getType()) ); ExecutorService executorService = Executors.newCachedThreadPool(); for (Patient patient : list) { executorService.submit(new Doctor(patient)); executorService.submit(new SpecialExamination(patient)); } executorService.shutdown(); System.out.println("分診結束"); } }

class Doctor implements Runnable {

private Patient patient;

public Doctor(Patient patient) {
    this.patient = patient;
}

@Override
public void run() {
    /**
     * 1.檢查是否掛號
     * 2.是否需要檢查原因
     * 3.處理其他患者
     * */
    while (null == patient.getReason()) {
        System.out.println("醫生" + Thread.currentThread().getName() + "單號" + patient.getNumber() + "需要進行檢查");
        patient.setCurThread(Thread.currentThread());
        LockSupport.park(patient);
    }
    System.out.println("醫生" + Thread.currentThread().getName() + "開始治療單號" + patient.getNumber() + "型別" + patient.getType() + "原因" + patient.getReason());
    return;
}

}

class SpecialExamination implements Runnable {

private Patient patient;

public SpecialExamination(Patient patient) {
    this.patient = patient;
}

@Override
public void run() {
        if (patient.getType() == Check.BRAIN.getType()) {
            patient.setReason("腦子壞了");
        } else if (patient.getType() == Check.EYES.getType()) {
            patient.setReason("眼鏡壞了");
        } else if (patient.getType() == Check.OTHER.getType()) {
            patient.setReason("其他原因");
        }
        System.out.println("通知單號" + patient.getNumber() + "已經拿到檢查結果可以開始會診");
        LockSupport.unpark(patient.getCurThread());
}

}

enum Check { BRAIN(1), EYES(2), OTHER(3);

public int getType() {
    return type;
}

private int type;

Check(int type) {
    this.type = type;
}

}

class Patient { private String number; private int type; private String reason;

public Thread getCurThread() {
    return curThread;
}

public void setCurThread(Thread curThread) {
    this.curThread = curThread;
}

private Thread curThread;

public Patient(int type) {
    this.number = UUID.randomUUID().toString();
    this.type = type;
}

public String getNumber() {
    return number;
}

public void setNumber(String number) {
    this.number = number;
}

public int getType() {
    return type;
}

public void setType(int type) {
    this.type = type;
}

public String getReason() {
    return reason;
}

public void setReason(String reason) {
    this.reason = reason;
}

}

執行結果: 醫生pool-1-thread-1單號f1cd75b6-3b08-4d91-86c8-b2840d586b42需要進行檢查 通知單號f1cd75b6-3b08-4d91-86c8-b2840d586b42已經拿到檢查結果可以開始會診 醫生pool-1-thread-1開始治療單號f1cd75b6-3b08-4d91-86c8-b2840d586b42型別1原因腦子壞了 醫生pool-1-thread-3單號0a1dad26-9569-494f-8c87-6185aa453d76需要進行檢查 通知單號0a1dad26-9569-494f-8c87-6185aa453d76已經拿到檢查結果可以開始會診 醫生pool-1-thread-1單號56202b98-789f-4bd3-ba5d-c0809559b3e2需要進行檢查 醫生pool-1-thread-3開始治療單號0a1dad26-9569-494f-8c87-6185aa453d76型別2原因眼鏡壞了 分診結束 通知單號56202b98-789f-4bd3-ba5d-c0809559b3e2已經拿到檢查結果可以開始會診 醫生pool-1-thread-1開始治療單號56202b98-789f-4bd3-ba5d-c0809559b3e2型別3原因其他原因
```

​ 上程式碼塊做了一些改造將 "醫生" 執行緒物件與 "患者" 任務進行了繫結,使用 LockSupport 進行阻塞和喚醒,由結果可得,兩個 "醫生" 執行緒都有預期的表現。

​ LockSupport 在不使用鎖的情況下完成了執行緒的預期排程。它是如何做到的呢?下文嘗試從原始碼角度探討背後的隱祕。

```java public class LockSupport { private LockSupport() {} // Cannot be instantiated.

private static void setBlocker(Thread t, Object arg) {

    UNSAFE.putObject(t, parkBlockerOffset, arg);
}
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}
public static Object getBlocker(Thread t) {
    if (t == null)
        throw new NullPointerException();
    return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}
private static final sun.misc.Unsafe UNSAFE;
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> tk = Thread.class;
        parkBlockerOffset = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("parkBlocker"));
        SEED = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomSeed"));
        PROBE = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomProbe"));
        SECONDARY = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
    } catch (Exception ex) { throw new Error(ex); }
}

} ```

​ 上程式碼塊是精簡過後的相關原始碼,該類相關功能 API 如下:

LockSuppoort

​ 粗略閱讀了這四個方法,一頭霧水,LockSupport 內部定義了大量的變數,多處使用了 Unsafe 類,想要讀懂它, 要先打破它的鎧甲,通讀全類發現多處依賴了 Unsafe 這個靜態類,這個類是什麼作用呢,來探一探究竟。

Unsafe

什麼是 Unsafe 類?

​ Java 是一種高階抽象語言,直接針對於系統底層操作呼叫方式較少,有些情況需要直接訪問系統記憶體資源並自主管理資源,為了較高的安全性捨棄了指標概念,所以當某些情況需要提升Java執行效率增強計算機底層資源訪問能力,所以產生了 Unsafe 類,用來替代部分指標角色。

​ 由於 UNSAFE 可以直接操作計算機底層資源引起資源管理洩露問題,所以使用 Unsafe 時要嚴謹<對於 Unsafe 類的呼叫方式,本文一筆帶過,可憑藉興趣自行查閱>。

Unsafe 類可以做什麼呢?

​ Unsafe 類中提供的 API 大致作用分類:

記憶體操作、CAS、物件操作、系統相關、執行緒排程、陣列操作、Class相關、記憶體屏障。 Unsafe

​ 初步瞭解 Unsafe 這個魔法類後,發現它竟然提供了這麼豐富的內容,那麼再回頭讀懂 LockSupport 原始碼對一些變數的位移獲取和設定就不會一頭霧水了,可以做出如下解讀:

```java static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> tk = Thread.class; // 獲取當前執行緒物件parkBlocker欄位的位移偏移量 parkBlockerOffset = UNSAFE.objectFieldOffset (tk.getDeclaredField("parkBlocker")); SEED = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSeed")); PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); SECONDARY = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSecondarySeed")); } catch (Exception ex) { throw new Error(ex); } }

public static void park(Object blocker) { Thread t = Thread.currentThread(); // 通過unsafe設定互斥變數 setBlocker(t, blocker); UNSAFE.park(false, 0L); // 清空互斥變數 setBlocker(t, null); }

private static void setBlocker(Thread t, Object arg) { // 當前執行緒物件的parkBlocker屬性設定arg UNSAFE.putObject(t, parkBlockerOffset, arg); }

public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } ```

​ park 和 unpark 中呼叫了 Unsafe 的 park 和 unpark,這兩個方法被 native 修飾,說明呼叫了C實現的原生函式,不過核心在這兩個方法內,他們內部做了什麼呢,只能繼續挖掘原始碼,去官網扒拉出原生函式原始碼。如下:

parker原始碼

park原始碼邏輯: ```java void Parker::park(bool isAbsolute, jlong time) { // 原子交換操作將計數器的值改為0,同時檢查計數器的原值是否大於 0,即_counter>0,如果大於0<執行unpark將_counter改為1>則表示unpark先於park執行,本次park直接返回,否則往下執行 if (Atomic::xchg(0, &_counter) > 0) return;

Thread thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread jt = (JavaThread *)thread; // 判斷當前執行緒是否標記了中斷,如果有中斷則直接返回 if (Thread::is_interrupted(thread, false)) { return; }

timespec absTime; if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } if (time > 0) { unpackTime(&absTime, isAbsolute, time); }

ThreadBlockInVM tbivm(jt);

// // 再次判斷當前執行緒是否標記了中斷,如果有中斷則直接返回,否則通過pthread_mutex_trylock增加互斥鎖 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; }

int status ; // 以上都不命中時,再次判斷計數器是否執行過unpark if (_counter > 0) { // no wait needed // 如果執行過unpark,則將計數器重置 _counter = 0; // 計數器重置後完成互斥鎖釋放 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); return; }

sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);

OSThreadWaitState osts(thread->osthread(), false / not Object.wait() /); jt->set_suspend_equivalent();

assert(_cur_index == -1, "invariant"); // 當無等待時長時,且未執行過unpark,呼叫pthread_cond_wait函式,查閱得該函式它會使當前執行緒加入作業系統的條件等待佇列,同時釋放 mutex 鎖並使當前執行緒掛起 // 等待其他執行緒執行pthread_cond_signal喚醒當前 if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait");

ifdef ASSERT

pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);

endif

_counter = 0 ; status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; OrderAccess::fence();

if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } } ```

unpark原始碼邏輯:

java void Parker::unpark() { int s, status ; // 呼叫pthread_mutex_lock函式,給該執行緒加互斥鎖,並通過下賦值操作將計數器賦值為1 status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; s = _counter; _counter = 1; if (s < 1) { // 計數器小於1則當前執行緒被執行過park if (_cur_index != -1) { // 判斷 Parker 物件所關聯的執行緒是否被 park,如果是,則通過 pthread_mutex_signal 函式喚醒該執行緒,最後釋放鎖。 // pthread_cond_signal和pthread_cond_wait配套使用,當前執行緒呼叫unpark的pthread_cond_signal後,則喚醒作業系統中某個執行pthread_cond_wait的等待執行緒並設定_cur_index=-1 if (WorkAroundNPTLTimedWaitHang) { status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); } else { // must capture correct index before unlocking int index = _cur_index; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); status = pthread_cond_signal (&_cond[index]); assert (status == 0, "invariant"); } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } }

梳理一下park和unpark執行邏輯:

park and unpark

​ Unsafe 的 park 和 unpark 主要依靠系統的 pthread_cond_signal 和 pthread_cond_wait 對執行緒進行掛起和恢復,來達成排程中的阻塞和喚醒效果,值得一提的是,wait 和 notify 底層也是依附這兩個函式進行執行緒掛起和恢復。

​ 經過以上內容對原始碼的分析,已然對 LockSupport 有一個簡單的認識,能夠不通過 Java 鎖來協調執行緒掛起和喚醒,還有神奇的 Unsafe 魔法類,此刻估計已經按奈不住躍躍欲試的心,那就快快把腦袋裡的想法變成程式碼吧。

推薦閱讀

資料中臺建設實踐(二)- 資料治理之資料質量

RocketMQ 延時方案分析與總結

政採雲Flutter低成本螢幕適配方案探索

招賢納士

政採雲技術團隊(Zero),一個富有激情、創造力和執行力的團隊,Base 在風景如畫的杭州。團隊現有300多名研發小夥伴,既有來自阿里、華為、網易的“老”兵,也有來自浙大、中科大、杭電等校的新人。團隊在日常業務開發之外,還分別在雲原生、區塊鏈、人工智慧、低程式碼平臺、中介軟體、大資料、物料體系、工程平臺、效能體驗、視覺化等領域進行技術探索和實踐,推動並落地了一系列的內部技術產品,持續探索技術的新邊界。此外,團隊還紛紛投身社群建設,目前已經是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等眾多優秀開源社群的貢獻者。如果你想改變一直被事折騰,希望開始折騰事;如果你想改變一直被告誡需要多些想法,卻無從破局;如果你想改變你有能力去做成那個結果,卻不需要你;如果你想改變你想做成的事需要一個團隊去支撐,但沒你帶人的位置;如果你想改變本來悟性不錯,但總是有那一層窗戶紙的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望參與到隨著業務騰飛的過程,親手推動一個有著深入的業務理解、完善的技術體系、技術創造價值、影響力外溢的技術團隊的成長過程,我覺得我們該聊聊。任何時間,等著你寫點什麼,發給 [email protected]

微信公眾號

文章同步釋出,政採雲技術團隊公眾號,歡迎關注

政採雲技術團隊.png