一文弄懂Java中線程池原理

語言: CN / TW / HK

theme: juejin

本文正在參加「金石計劃 . 瓜分6萬現金大獎」

在工作中,我們經常使用線程池,但是你真的瞭解線程池的原理嗎?同時,線程池工作原理和底層實現原理也是面試經常問的考題,所以,今天我們一起聊聊線程池的原理吧。

為什麼要用線程池

使用線程池主要有以下三個原因:

  1. 降低資源消耗。通過重複利用已創建的線程降低線程創建和銷燬造成的消耗。
  2. 提升響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
  3. 可以對線程做統一管理。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統穩定性,使用線程池可以進行統一分配、調優和監控。

線程池的原理

Java中的線程池頂層接口是Executor接口,ThreadPoolExecutor是這個接口的實現類。

我們先看看ThreadPoolExecutor類。

ThreadPoolExecutor提供的構造方法

java // 七個參數的構造函數 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

我們先看看這些參數是什麼意思:

  • int corePoolSize:該線程池中核心線程數最大值

核心線程:線程池中有兩類線程,核心線程和非核心線程。核心線程默認情況下會一直存在於線程池中,即使這個核心線程什麼都不幹(鐵飯碗),而非核心線程如果長時間的閒置,就會被銷燬(臨時工)。

  • int maximumPoolSize:該線程池中線程總數最大值

該值等於核心線程數量 + 非核心線程數量。

  • long keepAliveTime非核心線程閒置超時時長

非核心線程如果處於閒置狀態超過該值,就會被銷燬。如果設置allowCoreThreadTimeOut(true),則會也作用於核心線程。

  • TimeUnit unit:keepAliveTime的單位。

TimeUnit是一個枚舉類型。

  • BlockingQueue workQueue:阻塞隊列,維護着等待執行的Runnable任務對象

常用的幾個阻塞隊列:

  1. LinkedBlockingQueue:鏈式阻塞隊列,底層數據結構是鏈表,默認大小是Integer.MAX_VALUE,也可以指定大小。

  2. ArrayBlockingQueue:數組阻塞隊列,底層數據結構是數組,需要指定隊列的大小。

  3. SynchronousQueue:同步隊列,內部容量為0,每個put操作必須等待一個take操作,反之亦然。

  4. DelayQueue:延遲隊列,該隊列中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素 。

  5. ThreadFactory threadFactory

創建線程的工廠 ,用於批量創建線程,統一在創建線程時設置一些參數,如是否守護線程、線程的優先級等。如果不指定,會新建一個默認的線程工廠。

```java static class DefaultThreadFactory implements ThreadFactory { // 省略屬性 // 構造函數 DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; }

// 省略

} ```

  • RejectedExecutionHandler handler

拒絕處理策略,線程數量大於最大線程數就會採用拒絕處理策略,四種拒絕處理的策略為 :

  1. ThreadPoolExecutor.AbortPolicy默認拒絕處理策略,丟棄任務並拋出RejectedExecutionException異常。
  2. ThreadPoolExecutor.DiscardPolicy:丟棄新來的任務,但是不拋出異常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列頭部(最舊的)的任務,然後重新嘗試執行程序(如果再次失敗,重複此過程)。
  4. ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務。

ThreadPoolExecutor的策略

線程池本身有一個調度線程,這個線程就是用於管理布控整個線程池裏的各種任務和事務,例如創建線程、銷燬線程、任務隊列管理、線程隊列管理等等。

故線程池也有自己的狀態。ThreadPoolExecutor類中使用了一些final int常量變量來表示線程池的狀態 ,分別為RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。

java // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;

  • 線程池創建後處於RUNNING狀態。

  • 調用shutdown()方法後處於SHUTDOWN狀態,線程池不能接受新的任務,清除一些空閒worker,不會等待阻塞隊列的任務完成。

  • 調用shutdownNow()方法後處於STOP狀態,線程池不能接受新的任務,中斷所有線程,阻塞隊列中沒有被執行的任務全部丟棄。此時,poolsize=0,阻塞隊列的size也為0。

  • 當所有的任務已終止,ctl記錄的”任務數量”為0,線程池會變為TIDYING狀態。接着會執行terminated()函數。

  • 線程池處在TIDYING狀態時,執行完terminated()方法之後,就會由 TIDYING -> TERMINATED, 線程池被設置為TERMINATED狀態。

線程池主要的任務處理流程

處理任務的核心方法是execute,我們看看 JDK 1.8 源碼中ThreadPoolExecutor是如何處理線程任務的:

java // JDK 1.8 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1.當前線程數小於corePoolSize,則調用addWorker創建核心線程執行任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果不小於corePoolSize,則將任務添加到workQueue隊列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 2.1 如果isRunning返回false(狀態檢查),則remove這個任務,然後執行拒絕策略。 if (! isRunning(recheck) && remove(command)) reject(command); // 2.2 線程池處於running狀態,但是沒有線程,則創建線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.如果放入workQueue失敗,則創建非核心線程執行任務, // 如果這時創建非核心線程失敗(當前線程總數不小於maximumPoolSize時),就會執行拒絕策略。 else if (!addWorker(command, false)) reject(command); }

ctl.get()是獲取線程池狀態,用int類型表示。第二步中,入隊前進行了一次isRunning判斷,入隊之後,又進行了一次isRunning判斷。

為什麼要二次檢查線程池的狀態?

在多線程的環境下,線程池的狀態是時刻發生變化的。很有可能剛獲取線程池狀態後線程池狀態就改變了。判斷是否將command加入workqueue是線程池之前的狀態。倘若沒有二次檢查,萬一線程池處於非RUNNING狀態(在多線程環境下很有可能發生),那麼command永遠不會執行。

總結一下處理流程

  1. 線程總數量 < corePoolSize,無論線程是否空閒,都會新建一個核心線程執行任務(讓核心線程數量快速達到corePoolSize,在核心線程數量 < corePoolSize時)。注意,這一步需要獲得全局鎖。
  2. 線程總數量 >= corePoolSize時,新來的線程任務會進入任務隊列中等待,然後空閒的核心線程會依次去緩存隊列中取任務來執行(體現了線程複用)。
  3. 當緩存隊列滿了,説明這個時候任務已經多到爆棚,需要一些“臨時工”來執行這些任務了。於是會創建非核心線程去執行這個任務。注意,這一步需要獲得全局鎖。
  4. 緩存隊列滿了, 且總線程數達到了maximumPoolSize,則會採取上面提到的拒絕策略進行處理。

整個過程如圖所示:

image.png

ThreadPoolExecutor如何做到線程複用的?

我們知道,一個線程在創建的時候會指定一個線程任務,當執行完這個線程任務之後,線程自動銷燬。但是線程池卻可以複用線程,即一個線程執行完線程任務後不銷燬,繼續執行另外的線程任務。那麼,線程池如何做到線程複用呢?

原來,ThreadPoolExecutor在創建線程時,會將線程封裝成工作線程worker,並放入工作線程組中,然後這個worker反覆從阻塞隊列中拿任務去執行。

這裏的addWorker方法是在上面提到的execute方法裏面調用的,先看看上半部分:

```java // ThreadPoolExecutor.addWorker方法源碼上半部分 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            // 1.如果core是ture,證明需要創建的線程為核心線程,則先判斷當前線程是否大於核心線程
            // 如果core是false,證明需要創建的是非核心線程,則先判斷當前線程數是否大於總線程數
            // 如果不小於,則返回false
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

```

上半部分主要是判斷線程數量是否超出閾值,超過了就返回false。我們繼續看下半部分:

```java // ThreadPoolExecutor.addWorker方法源碼下半部分 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 1.創建一個worker對象 w = new Worker(firstTask); // 2.實例化一個Thread對象 final Thread t = w.thread; if (t != null) { // 3.線程池全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            // 4.啟動這個線程
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;

} ```

創建worker對象,並初始化一個Thread對象,然後啟動這個線程對象。

我們接着看看Worker類,僅展示部分源碼:

```java // Worker類部分源碼 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread; Runnable firstTask;

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
        runWorker(this);
}
//其餘代碼略...

} ```

Worker類實現了Runnable接口,所以Worker也是一個線程任務。在構造方法中,創建了一個線程,線程的任務就是自己。故addWorker方法調用addWorker方法源碼下半部分中的第4步t.start,會觸發Worker類的run方法被JVM調用。

我們再看看runWorker的邏輯:

java // Worker.runWorker方法源代碼 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 1.線程啟動之後,通過unlock方法釋放鎖 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 2.Worker執行firstTask或從workQueue中獲取任務,如果getTask方法不返回null,循環不退出 while (task != null || (task = getTask()) != null) { // 2.1進行加鎖操作,保證thread不被其他線程中斷(除非線程池被中斷) w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 2.2檢查線程池狀態,倘若線程池處於中斷狀態,當前線程將中斷。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 2.3執行beforeExecute beforeExecute(wt, task); Throwable thrown = null; try { // 2.4執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 2.5執行afterExecute方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 2.6解鎖操作 w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

首先去執行創建這個worker時就有的任務,當執行完這個任務後,worker的生命週期並沒有結束,在while循環中,worker會不斷地調用getTask方法從阻塞隊列中獲取任務然後調用task.run()執行任務,從而達到複用線程的目的。只要getTask方法不返回null,此線程就不會退出。

當然,核心線程池中創建的線程想要拿到阻塞隊列中的任務,先要判斷線程池的狀態,如果STOP或者TERMINATED,返回null

最後看看getTask方法的實現:

```java // Worker.getTask方法源碼 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?

for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

    int wc = workerCountOf(c);

    // Are workers subject to culling?
    // 1.allowCoreThreadTimeOut變量默認是false,核心線程即使空閒也不會被銷燬
    // 如果為true,核心線程在keepAliveTime內仍空閒則會被銷燬。 
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    // 2.如果運行線程數超過了最大線程數,但是緩存隊列已經空了,這時遞減worker數量。

// 如果有設置允許線程超時或者線程數量超過了核心線程數量, // 並且線程在規定時間內均未poll到任務且隊列為空則遞減worker數量 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }

    try {
        // 3.如果timed為true(想想哪些情況下timed為true),則會調用workQueue的poll方法獲取任務.
        // 超時時間是keepAliveTime。如果超過keepAliveTime時長,
        // poll返回了null,上邊提到的while循序就會退出,線程也就執行完了。
        // 如果timed為false(allowCoreThreadTimeOut為false
        // 且wc > corePoolSize為false),則會調用workQueue的take方法阻塞在當前。
        // 隊列中有任務加入時,線程被喚醒,take方法返回任務,並執行。
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
}

} ```

核心線程的會一直卡在workQueue.take方法,被阻塞並掛起,不會佔用CPU資源,直到拿到Runnable 然後返回(當然如果allowCoreThreadTimeOut設置為true,那麼核心線程就會去調用poll方法,因為poll可能會返回null,所以這時候核心線程滿足超時條件也會被銷燬)。

非核心線程會workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超時還沒有拿到,下一次循環判斷compareAndDecrementWorkerCount就會返回null,Worker對象的run()方法循環體的判斷為null,任務結束,然後線程被系統回收 。

四種常見的線程池

Executors類中提供的幾個靜態方法來創建線程池。

newCachedThreadPool

java public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

CacheThreadPool運行流程如下:

  1. 提交任務進線程池。
  2. 因為corePoolSize為0的關係,不創建核心線程,線程池最大為Integer.MAX_VALUE。
  3. 嘗試將任務添加到SynchronousQueue隊列。
  4. 如果SynchronousQueue入列成功,等待被當前運行的線程空閒後拉取執行。如果當前沒有空閒線程,那麼就創建一個非核心線程,然後從SynchronousQueue拉取任務並在當前線程執行。
  5. 如果SynchronousQueue已有任務在等待,入列操作將會阻塞。

當需要執行很多短時間的任務時,CacheThreadPool的線程複用率比較高, 會顯著的提高性能。而且線程60s後會回收,意味着即使沒有任務進來,CacheThreadPool並不會佔用很多資源。

newFixedThreadPool

java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

核心線程數量和總線程數量相等,都是傳入的參數nThreads,所以只能創建核心線程,不能創建非核心線程。因為LinkedBlockingQueue的默認大小是Integer.MAX_VALUE,故如果核心線程空閒,則交給核心線程處理;如果核心線程不空閒,則入列等待,直到核心線程空閒。

與CachedThreadPool的區別

  • 因為 corePoolSize == maximumPoolSize ,所以FixedThreadPool只會創建核心線程。 而CachedThreadPool因為corePoolSize=0,所以只會創建非核心線程。
  • 在 getTask() 方法,如果隊列裏沒有任務可取,線程會一直阻塞在 LinkedBlockingQueue.take() ,線程不會被回收。 CachedThreadPool會在60s後收回。
  • 由於線程不會被回收,會一直卡在阻塞,所以沒有任務的情況下, FixedThreadPool佔用資源更多
  • 都幾乎不會觸發拒絕策略,但是原理不同。FixedThreadPool是因為阻塞隊列可以很大(最大為Integer最大值),故幾乎不會觸發拒絕策略;CachedThreadPool是因為線程池很大(最大為Integer最大值),幾乎不會導致線程數量大於最大線程數,故幾乎不會觸發拒絕策略。

newSingleThreadExecutor

java public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

有且僅有一個核心線程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不會創建非核心線程。所有任務按照先來先執行的順序執行。如果這個唯一的線程不空閒,那麼新來的任務會存儲在任務隊列裏等待執行。

newScheduledThreadPool

創建一個定長線程池,支持定時及週期性任務執行。

```java public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

//ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } ```

四種常見的線程池基本夠我們使用了,但是《阿里巴巴開發手冊》不建議我們直接使用Executors類中的線程池,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學需要更加明確線程池的運行規則,規避資源耗盡的風險。

但如果你及團隊本身對線程池非常熟悉,又確定業務規模不會大到資源耗盡的程度(比如線程數量或任務隊列長度可能達到Integer.MAX_VALUE)時,其實是可以使用JDK提供的這幾個接口的,它能讓我們的代碼具有更強的可讀性。

小結

在工作中,很多人因為不瞭解線程池的實現原理,把線程池配置錯誤,從而導致各種問題。希望你們閲讀完本文,能夠學會合理的使用線程池。

對於真正想弄懂java併發編程的小夥伴,網上的文章還有視頻缺乏系統性,我建議大家還是買點書籍看看,我推薦兩本我看過的書。

《Java併發編程實戰》:這本書深入淺出地介紹了Java線程和併發,是一本非常棒的Java併發參考手冊。

《Java併發編程藝術》:Java併發編程的概念本來就比較複雜,我們需要的是一本能夠把原理解釋清楚的書籍,而這本《Java併發編程的藝術》書是國內作者寫的Java併發書籍,剛好就比上面那一本更簡單易懂,至少我自己看下來是這樣的感覺。