☕【Java技術指南】「併發編程專題」Fork/Join框架基本使用和原理探究(原理篇)

語言: CN / TW / HK

theme: mk-cute

ForkJoin線程池框架回顧

  • ForkJoin框架其實就是一個線程池ExecutorService的實現,通過工作竊取(work-stealing)算法,獲取其他線程中未完成的任務來執行。

  • 可以充分利用機器的多處理器優勢,利用空閒的線程去並行快速完成一個可拆分為小任務的大任務,類似於分治算法。

  • ForkJoin的目標,就是利用所有可用的處理能力來提高程序的響應和性能。本文將介紹ForkJoin框架,源碼剖析。

ForkJoinPool的類架構圖

ForkJoinPool核心類實現

  • ForkJoin框架的核心是ForkJoinPool類,基於AbstractExecutorService擴展。
  • ForkJoinPool中維護了一個隊列數組WorkQueue[],每個WorkQueue維護一個ForkJoinTask數組和當前工作線程。
  • ForkJoinPool實現了工作竊取(work-stealing)算法並執行ForkJoinTask。

ForkJoinPool,所有線程和WorkQueue共享,用於工作竊取、任務狀態和工作狀態同步。

核心屬性介紹

  • ADD_WORKER: 100000000000000000000000000000000000000000000000 -> 1000 0000 0000 0000,用來配合ctl在控制線程數量時使用
  • ctl: 控制ForkJoinPool創建線程數量,(ctl & ADD_WORKER) != 0L 時創建線程,也就是當ctl的第16位不為0時,可以繼續創建線程
  • defaultForkJoinWorkerThreadFactory: 默認線程工廠,默認實現是DefaultForkJoinWorkerThreadFactory
  • runState: 全局鎖控制,全局運行狀態
  • workQueues: 工作隊列數組WorkQueue[]
  • config: 記錄並行數量和ForkJoinPool的模式(異步或同步)

ForkJoinTask

  • status: 任務的狀態,對其他工作線程和pool可見,運行正常則status為負數,異常情況為正數

WorkQueue

  • qlock: 併發控制,put任務時的鎖控制

  • array: 任務數組ForkJoinTask<?>[]

  • pool: ForkJoinPool,所有線程和WorkQueue共享,用於工作竊取、任務狀態和工作狀態同步

  • base: array數組中取任務的下標

  • top: array數組中放置任務的下標

  • owner: 所屬線程,ForkJoin框架中,只有一個WorkQueue是沒有owner的,其他的均有具體線程owner。

  • WorkQueue 內部就是ForkJoinTask

workQueue: 當前線程的任務隊列,與WorkQueue的owner呼應


ForkJoinTask是能夠在ForkJoinPool中執行的任務抽象類,父類是Future,具體實現類有很多,這裏主要關注RecursiveAction和RecursiveTask。

  • RecursiveAction是沒有返回結果的任務

  • RecursiveTask是需要返回結果的任務

只需要實現其compute()方法,在compute()中做最小任務控制,任務分解(fork)和結果合併(join)。

ForkJoinWorkerThread

ForkJoinPool中執行的默認線程是ForkJoinWorkerThread,由默認工廠產生,可以自己重寫要實現的工作線程。同時會將ForkJoinPool引用放在每個工作線程中,供工作竊取時使用。

  • pool: ForkJoinPool,所有線程和WorkQueue共享,用於工作竊取、任務狀態和工作狀態同步

  • workQueue: 當前線程的任務隊列,與WorkQueue的owner呼應


  • ForkJoinPool作為最核心的組件,維護了所有的任務隊列WorkQueues,workQueues維護着所有線程池的工作線程,工作竊取算法就是在這裏進行的。

  • 每一個WorkQueue對象中使用pool保留對ForkJoinPool的引用,用來獲取其WorkQueues來竊取其他工作線程的任務來執行。

  • 同時WorkQueue對象中的owner是ForkJoinWorkerThread工作線程,綁定ForkJoinWorkerThread和WorkQueue的一對一關係,每個工作線程會優先完成自己隊列的任務,當自己隊列中的任務為空時,才會通過工作竊取算法從其他任務隊列中獲取任務。

  • WorkQueue中的ForkJoinTask<?>[] array,是每一個具體的任務,插入array中的第一個任務是最大的任務。

源碼分析

ForkJoinPool構造函數

ForkJoinPool有四個構造函數,其中參數最全的那個構造函數如下所示: ```java public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) ````

  • parallelism:可並行級別,Fork/Join框架將依據這個並行級別的設定,決定框架內並行執行的線程數量。並行的每一個任務都會有一個線程進行處理,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數量,也不要將這個屬性和ThreadPoolExecutor線程池中的corePoolSize、maximumPoolSize屬性進行比較,因為ForkJoinPool的組織結構和工作方式與後者完全不一樣。

  • factory:當Fork/Join框架創建一個新的線程時,同樣會用到線程創建工廠。只不過這個線程工廠不再需要實現ThreadFactory接口,而是需要實現ForkJoinWorkerThreadFactory接口。

    • 後者是一個函數式接口,只需要實現一個名叫newThread的方法。

    • 在Fork/Join框架中有一個默認的ForkJoinWorkerThreadFactory接口實現:DefaultForkJoinWorkerThreadFactory。

  • handler:異常捕獲處理器。當執行的任務中出現異常,並從任務中被拋出時,就會被handler捕獲。

  • asyncMode:這個參數也非常重要,從字面意思來看是指的異步模式,它並不是説Fork/Join框架是採用同步模式還是採用異步模式工作。

    • Fork/Join框架中為每一個獨立工作的線程準備了對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。即是説存在於隊列中的待執行任務,即可以使用先進先出的工作模式,也可以使用後進先出的工作模式。

當asyncMode設置為true的時候,隊列採用先進先出方式工作;反之則是採用後進先出的方式工作,該值默認為false

......
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
......
  • ForkJoinPool還有另外兩個構造函數,一個構造函數只帶有parallelism參數,既是可以設定Fork/Join框架的最大並行任務數量;
  • 另一個構造函數則不帶有任何參數,對於最大並行任務數量也只是一個默認值——當前操作系統可以使用的CPU內核數量(Runtime.getRuntime().availableProcessors())。
  • 實際上ForkJoinPool還有一個私有的、原生構造函數,之上提到的三個構造函數都是對這個私有的、原生構造函數的調用。

java private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }

使用案例

java ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

先看ForkJoinPool的創建過程,這個比較簡單,創建了一個ForkJoinPool對象,帶有默認ForkJoinWorkerThreadFactory,並行數跟機器核數一樣,同步模式。

提交任務

forkJoinPool.invoke(new CountRecursiveTask(1, 100));會先執行到ForkJoinPool#externalPush中,此時forkJoinPool.workQueues並沒有完成初始化工作,所以執行到ForkJoinPool#externalSubmit。

externalSubmit

這裏是一個for無限循環實現,跳出邏輯全部在內部控制,主要結合runState來控制。

  1. 建ForkJoinPool的WorkQueue[]變量workQueues,長度為大於等於2倍並行數量的且是2的n次冪的數。這裏對傳入的並行數量使用了位運算,來計算出workQueues的長度。

  2. 創建一個WorkQueue變量q,q.base=q.top=4096,q的owner為null,無工作線程,放入workQueues數組中

  3. 創建q.array對象,長度8192,將ForkJoinTask也就是代碼案例中的CountRecursiveTask放入q.array,pool為傳入的ForkJoinPool,並將q.top加1,完成後q.base=4096,q.top=4097。然後執行ForkJoinPool#signalWork方法。(base下標表示用來取數據的,top下標表示用來放數據的,當base小於top時,説明有數據可以取)

externalSubmit主要完成3個小步驟工作,每個步驟都使用了鎖的機制來處理併發事件,既有對runState使用ForkJoinPool的全局鎖,也有對WorkQueue使用局部鎖。

signalWork

signalWork方法的簽名是:void signalWork(WorkQueue[] ws, WorkQueue q)。ws為ForkJoinPool中的workQueues,q為externalSubmit方法中新建的用於存放ForkJoinTask的WorkQueue.

  • signalWork中會根據ctl的值判斷是否需要創建創建工作線程,當前暫無,因此走到tryAddWorker(),並在createWorker()來創建,使用默認工廠方法ForkJoinWorkerThread#ForkJoinWorkerThread(ForkJoinPool)來創建一個ForkJoinWorkerThread,ForkJoinPool為前面創建的pool。

  • 並創建一個WorkQueue其owner為新創建的工作線程,其array為空,被命名為ForkJoinPool-1-worker-1,且將其存放在pool.workQueues數組中。

  • 創建完線程之後,工作線程start()開始工作。

  • 這樣就創建了兩個WorkQueue存放在pool.workQueues,其中一個WorkQueue保存了第一個大的ForkJoinTask,owner為null,其base=4096,top=4097;第二個WorkQueue的owner為新建的工作線程,array為空,暫時無數據,base=4096,top=4096。

ForkJoinWorkerThread#run
  • 執行ForkJoinWorkerThread線程ForkJoinPool-1-worker-1,執行點來到ForkJoinWorkerThread#run,注意這裏是在ForkJoinWorkerThread中,此時的workQueue.array還是空的,pool為文中唯一的一個,是各個線程會共享的。

  • run方法中首先是一個判斷 if (workQueue.array == null) { // only run once,這也驗證了我們前面的分析,當前線程的workQueue.array是空的。每個新建的線程,擁有的workQueue.array是沒有任務的。那麼它要執行的任務從哪裏來?

  • runWorker()方法中會執行一個死循環,去scan掃描是否有任務可以執行。全文的講到的工作竊取work-stealing算法,就在java.util.concurrent.ForkJoinPool#scan。當有了上圖的模型概念時,這個方法的實現看過就會覺得其實非常簡單。

java WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; //如果pool.workQueues即ws的k下標元素不為空 if ((q = ws[k]) != null) { //如果base<top且array不為空,則説明有元素。為什麼還需要array不為空才説明有元素? //從下面可以知道由於獲取元素後才會設置base=base+1,所以可能出現上一個線程拿到元素了但是沒有及時更新base if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //這裏使用getObjectVolatile去獲取當前WorkQueue的元素 //volatile是保證線程可見性的,也就是上一個線程可能已經拿掉了,可能已經將這個任務置為空了。 if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { if (ss >= 0) { //拿到任務之後,將array中的任務用CAS的方式置為null,並將base加1 if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; if (n < -1) // signal others signalWork(ws, q); return t; } } else if (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; }

CountRecursiveTask#compute

重寫compute方法一般需要遵循這個規則來寫

if(任務足夠小){ 直接執行任務; 如果有結果,return結果; }else{ 拆分為2個子任務; 分別執行子任務的fork方法; 執行子任務的join方法; 如果有結果,return合併結果; }

```java public final ForkJoinTask fork() { Thread t; //如果是工作線程,則往自己線程中的workQuerue中添加子任務;否則走首次添加邏輯 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } ````

ForkJoinPool.WorkQueue#push方法會將當前子任務存放到array中,並調用ForkJoinPool#signalWork添加線程或等待其他線程去竊取任務執行。過程又回到前面講到的signalWork流程。

ForkJoinTask#externalAwaitDone
  • 主線程在把任務放置在第一個WorkQueue的array之後,啟動工作線程就退出了。如果使用的是異步的方式,則使用Future的方式來獲取結果,即提交的ForkJoinTask,通過isDone(),get()方法判斷和得到結果。

  • 異步的方式跟同步方式在防止任務的過程是一樣的,只是主線程可以任意時刻再通過ForkJoinTask去跟蹤結果。本案例用的是同步的寫法,因此主線程最後在ForkJoinTask#externalAwaitDone等待任務完成。

  • 這裏主線程會執行Object#wait(long),使用的是Object類中的wait,在當前ForkJoinTask等待,直到被notify。而notify這個動作會在ForkJoinTask#setCompletion中進行,這裏使用的是notifyAll,因為需要通知的有主線程和工作線程,他們都共同享用這個對象,需要被喚起。

ForkJoinTask#join

來看left.join() + right.join(),在將left和right的Task放置在當前工作線程的workQueue之後,執行join()方法,join()方法最終會在ForkJoinPool.WorkQueue#tryRemoveAndExec中將剛放入的left取出,將對應workQueue中array的left任務置為空,然後執行left任務。然後執行到left的compute方法。對於right任務也是一樣,繼續子任務的fork和join工作,如此循環往復。

java public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } 當工作線程執行結束後,會執行getRawResult,拿到結果。

Work-Steal算法

相比其他線程池實現,這個是ForkJoin框架中最大的亮點。當空閒線程在自己的WorkQueue沒有任務可做的時候,會去遍歷其他的WorkQueue,並進行任務竊取和執行,提高程序響應和性能。

取2的n次冪作為長度的實現

java //代碼位於java.util.concurrent.ForkJoinPool#externalSubmit if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; ns = STARTED; }

這裏的p其實就是設置的並行線程數,在為ForkJoinPool創建WorkQueue[]數組時,會對傳入的p進行一系列位運算,最終得到一個大於等於2p的2的n次冪的數組長度

內存屏障

java //代碼位於java.util.concurrent.ForkJoinPool#externalSubmit if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; //通過Unsafe進行內存值的設置,高效,且屏蔽了處理器和Java編譯器的指令亂序問題 U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; }

這裏在對單個WorkQueue的array進行push任務操作時,先後使用了putOrderedObject和putOrderedInt,確保程序執行的先後順序,同時這種直接操作內存地址的方式也會更加高效。

高併發:細粒度WorkQueue的鎖

java //代碼位於java.util.concurrent.ForkJoinPool#externalSubmit //如果qlock為0,説明當前沒有其他線程操作改WorkQueue //嘗試CAS操作,修改qlock為1,對這個WorkQueue進行加鎖 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { //finally將qlock置為0,進行鎖的釋放,其他線程可以使用 U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { signalWork(ws, q); return; } }

這裏對單個WorkQueue的array進行push任務操作時,使用了qlock的CAS細粒度鎖,讓併發只落在一個WOrkQueue中,而不是整個pool中,極大提高了程序的併發性能,類似於ConcurrentHashMap。

「其他文章」