從簡單程式碼入手,分析執行緒池原理

語言: CN / TW / HK

一、執行緒池簡介

1、池化思想

在專案工程中,基於池化思想的技術應用很多,例如基於執行緒池的任務併發執行,中介軟體服務的連線池配置,通過對共享資源的管理,降低資源的佔用消耗,提升效率和服務效能。

池化思想從直觀感覺上理解,既有作為容器的儲存能力(持續性的承接),也要具備維持一定量的儲備能力(初始化的提供),同時作為容器又必然有大小的限制,下面通過這個基礎邏輯來詳細分析Java中的執行緒池原理。

2、執行緒池

首先熟悉JVM執行週期的都知道,在記憶體中頻繁的建立和銷燬物件是很影響效能的,而執行緒作為程序中執行的基本單位,通過執行緒池的方式重複使用已建立的執行緒,在任務執行動作上避免或減少執行緒的頻繁建立動作。

執行緒池中維護多個執行緒,當收到排程任務時可以避免建立執行緒直接執行,並以此降低服務資源的消耗,把相對不確定的併發任務管理在相對確定的執行緒池中,提高系統服務的穩定性。下文基於 JDK1.8 圍繞 ThreadPoolExecutor 類深入分析。

二、原理與週期

1、類圖設計

  • Executor 介面

原始碼註釋解讀:將來會執行命令,任務提交和執行兩個動作會被解耦,傳入Runnable任務物件即可,執行緒池會執行相應排程和任務處理。Executor雖然是ThreadPoolExecutor執行緒池的頂層介面,但是其本身只是抽象了任務的處理思想。

  • ExecutorService 介面

擴充套件Executor介面,單個或批量的給任務的執行結果生成Future,並增添任務中斷或終止的管理方法。

  • AbstractExecutorService 抽象類

提供對ExecutorService介面定義的任務執行方法(submit,invokeAll)等預設實現,提供newTaskFor方法用於構建RunnableFuture物件。

  • ThreadPoolExecutor 類

維護執行緒池生命週期,管理執行緒和任務,通過相應排程機制實現任務的併發執行。

2、基本案例

示例中建立了一個簡單的 butte-pool 執行緒池,設定4個核心執行緒執行任務,佇列容器設定256大小;在實際業務中,對於引數設定需要考量任務執行時間,服務配置,測試資料等。

public class ThrPool implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ThrPool.class) ;
/**
* 執行緒池管理,ThreadFactoryBuilder出自Guava工具庫
*/

private static final ThreadPoolExecutor DEV_POOL;
static {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("butte-pool-%d").build();
DEV_POOL = new ThreadPoolExecutor(0, 8,60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(256),threadFactory, new ThreadPoolExecutor.AbortPolicy());
DEV_POOL.allowCoreThreadTimeOut(true);
}
/**
* 任務方法
*/

@Override
public void run() {
try {
logger.info("Print...Job...Run...;queue_size:{}",DEV_POOL.getQueue().size());
Thread.sleep(5000);
} catch (Exception e){
e.printStackTrace();
}
}
}

通過對上述執行緒池核心引數的不斷調整,以及控制任務執行時間的長短,尤其可以設定一些引數的極端值,觀察任務執行的效果,可以初步感知執行緒池的執行特點,下面圍繞該案例展開詳細的分析。

3、構造方法

在ThreadPoolExecutor類中提供多個構造方法,以滿足不同場景下執行緒池的構造需求,這裡需要描述幾個注意事項:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)

  • 從構造方法的判斷中,corePoolSize的大小允許設定為0,在分析任務執行時再細說影響;

  • 執行緒池建立後,不會立即啟動核心執行緒,通常會等到任務提交的時候再去啟動;或者主動執行 prestartCoreThread||prestartAllCoreThreads 方法;
  • 在當前版本的JDK中,CoreThread核心執行緒也是允許超時終止掉的,避免執行緒長時間閒置;

  • 如果允許核心執行緒超時終止,該方法會校驗keepAliveTime必須大於0,否則丟擲異常;

4、執行原理

執行緒池的基本執行邏輯,任務提交之後有三種處理方式:直接分配執行緒執行;或者被放入任務佇列,等待執行;如果直接被拒絕,會返回異常;任務的提交和執行被解耦,構成一個生產消費的模型。

5、生命週期

這裡從原始碼開始逐步分析執行緒池的核心邏輯,首先看看對於生命週期的狀態描述,涉及如下幾個核心欄位:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 狀態描述
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

ctl 控制執行緒池的狀態,包含兩個概念欄位: workerCount 執行緒池內有效執行緒數, runState 執行狀態,具體的執行有5種狀態描述:

  • RUNNING:接受新任務,處理阻塞佇列中的任務;

  • SHUTDOWN:不接受新任務,處理阻塞佇列中已存在的任務;

  • STOP:不接受新任務,不處理阻塞佇列中的任務,中斷正在進行的任務;

  • TIDYING:所有任務都已終止,workerCount=0,執行緒池進入該狀態後會執行 terminated() 方法;
  • TERMINATED: 執行 terminated() 方法完後進入該狀態;

狀態之間的轉換邏輯如下:

通過 runStateOf() 方法可以計算當前的執行狀態,這裡對於執行緒池生命週期的定義,以及狀態的轉換邏輯在 ctl 欄位的原始碼註釋中,更多細節可以參考該處描述文件。

三、任務管理

1、排程邏輯

從上面對執行緒池有整體的瞭解之後,現在從任務提交和執行這個核心流程入手,對原始碼和邏輯進行深入分析。任務排程作為執行緒池的核心能力,可以直接從 execute(task) 方法切入。

public void execute(Runnable command) {
// 上文描述的workerCount與runState
int c = ctl.get();
// 核心執行緒池
if (workerCountOf(c) < corePoolSize){}
// 任務佇列
if (isRunning(c) && workQueue.offer(command)){}
// 拒絕策略
else if (!addWorker(command, false)) reject(command);
}

從整體上看,任務排程被放在三個分支步驟中判斷,即:核心執行緒池、任務佇列、拒絕策略,下面再細看每個分支的處理邏輯;

1.1 核心執行緒池

// 如果有效執行緒數小於核心執行緒數,新建執行緒並綁定當前任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
}

1.2 任務佇列

// 如果執行緒池是執行狀態,並且任務新增佇列成功
if (isRunning(c) && workQueue.offer(command)) {
// 二次校驗如果是非執行狀態,則移除該任務,執行拒絕策略
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果有效執行緒數是0,執行addWorker新增方法
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

1.3 拒絕策略

// 再次執行addWorker方法,如果失敗則拒絕該任務
else if (!addWorker(command, false)) reject(command);

這樣execute方法執行邏輯,任務排程的流程如下:

如上圖任務被提交到執行緒池後的核心排程邏輯,任務既然提交自然是希望被執行的,原始碼中也多處呼叫 addWorker 方法新增工作執行緒。

2、Worker執行緒

執行緒池內工作執行緒被封裝在Worker類中,繼承AQS並實現Runnable介面,維護執行緒的建立和任務的執行:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // 持有執行緒
Runnable firstTask; // 初始化任務
}

2.1 addWorker 方法

既然新增工作執行緒,意味有任務需要執行:

  • firstTask:新建立的執行緒第一個執行的任務,允許為空或者null;

  • core:傳true,新增執行緒時判斷當前執行緒數是否小於corePoolSize;傳false,新增執行緒時判斷當前執行緒數是否小於maximumPoolSize;

private final HashSet<Worker> workers = new HashSet<Worker>();
private final BlockingQueue<Runnable> workQueue;
private boolean addWorker(Runnable firstTask, boolean core)

通過對該方法的原始碼分析,執行邏輯流程如下:

工作執行緒建立之後,在HashSet中維護和持有執行緒的引用,這樣就可以對執行緒池做相應的 put 或者 remove 操作,進而對生命週期進行管理。

2.2 runWorker 方法

在Worker類中對於run方法的實現,實際上是委託給runWorker方法,用來 週期性 執行具體的執行緒任務,同樣分析其執行邏輯:

整個執行流程通過while迴圈不斷獲取任務並執行任務,整個過程也需要不斷的校驗執行緒池狀態,及時的中斷執行緒執行,該方法執行完成後會請求執行緒銷燬動作。

3、任務佇列

執行緒池兩大核心能力執行緒和任務的管理,並且對二者解耦,通過佇列中任務的管理構建生產消費模式,不同的佇列型別有各自的存取政策;LinkedBlockingQueue建立連結串列結構的佇列,預設的 Integer.MAX_VALUE 容量過度,需要指定佇列大小,按照先進先出的原則管理;

3.1 getTask 方法

在獲取任務時,除了必要的執行緒池狀態判斷,就是要校驗當前任務的執行緒是否需要超時回收,上面已經提過即使核心執行緒池也可以設定超時時效,如果沒有獲取到任務,則認為 runWorker 方法執行完成:

3.2 reject 方法

不管是執行緒池還是任務佇列,都有容量的邊界,當容量達到上限時,就需要拒絕新提交的任務,在上述案例中採用的是ThreadPoolExecutor.AbortPolicy丟棄任務並丟擲異常,還有其他幾種策略按需選擇即可。

四、監控與配置

在大部分的專案中,對於執行緒池都是直接定義好相關引數,如果需要調整,也基本都需要服務重啟來完成,實際上執行緒池有一些放開的引數調整與查詢的方法:

setCorePoolSize 方法

在方法內部經過一系列的邏輯校驗,保證執行緒池平穩的過渡,整個流程嚴謹且複雜,結合線程池引數獲取方法,就可以進行動態化的引數配置與監控,從而實現可控的執行緒池管理:

最後關於更多執行緒池的細節問題,可以多閱讀原始碼文件,並結合案例進行實踐;執行緒池的原理在很多元件中都有應用,例如各種連線池,平行計算等,同樣值得深入學習和總結。

五、參考原始碼

應用倉庫:
https://gitee.com/cicadasmile/butte-flyer-parent

元件封裝:
https://gitee.com/cicadasmile/butte-frame-parent