Java執行緒池必知必會

語言: CN / TW / HK

1、執行緒數使用開發規約

阿里巴巴開發手冊中關於執行緒和執行緒池的使用有如下三條強制規約

【強制】建立執行緒或執行緒池時請指定有意義的執行緒名稱,方便出錯時回溯。

正例:自定義執行緒工廠,並且根據外部特徵進行分組,比如,來自同一機房的呼叫,把機房編號賦值給whatFeatureOfGroup


public class UserThreadFactory implements ThreadFactory {

private final String namePrefix;

private final AtomicInteger nextId = new AtomicInteger(1);

/**

* 定義執行緒組名稱,在利用 jstack 來排查問題時,非常有幫助

*/

UserThreadFactory(String whatFeatureOfGroup) {

namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-";

}

@Override

public Thread newThread(Runnable task) {

String name = namePrefix + nextId.getAndIncrement();

Thread thread = new Thread(null, task, name, 0);

System.out.println(thread.getName());

return thread;

}

}

複製程式碼

【強制】執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式建立執行緒。

說明:執行緒池的好處是減少在建立和銷燬執行緒上所消耗的時間以及系統資源的開銷,解決資源不足的問題。

如果不使用執行緒池,有可能造成系統建立大量同類執行緒而導致消耗完記憶體或者“過度切換”的問題。

【強制】執行緒池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式,這

樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。

說明:Executors 返回的執行緒池物件的弊端如下:

1) FixedThreadPool 和 SingleThreadPool:

允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

2) CachedThreadPool:

允許的建立執行緒數量為 Integer.MAX_VALUE,可能會建立大量的執行緒,從而導致 OOM。

2、 ThreadPoolExecutor原始碼

1. 建構函式

UML圖:

ThreadPoolExecutor的建構函式共有四個,但最終呼叫的都是同一個:

2.核心引數

  1. corePoolSize => 執行緒池核心執行緒數量

  2. maximumPoolSize => 執行緒池最大數量

  3. keepAliveTime => 執行緒池的工作執行緒空閒後,保持存活的時間。如果任務多而且任務的執行時間比較短,可以調大keepAliveTime,提高執行緒的利用率。

  4. unit => 時間單位

  5. workQueue => 執行緒池所使用的緩衝佇列,佇列型別有:

    • ArrayBlockingQueue,基於陣列結構的有界阻塞佇列,按FIFO(先進先出)原則對任務進行排序。使用該佇列,執行緒池中能建立的最大執行緒數為maximumPoolSize

    • LinkedBlockingQueue,基於連結串列結構的無界阻塞佇列,按FIFO(先進先出)原則對任務進行排序,吞吐量高於ArrayBlockingQueue。使用該佇列,執行緒池中能建立的最大執行緒數為corePoolSize。靜態工廠方法 Executor.newFixedThreadPool()使用了這個佇列。

    • SynchronousQueue,一個不儲存元素的阻塞佇列。新增任務的操作必須等到另一個執行緒的移除操作,否則新增操作一直處於阻塞狀態。靜態工廠方法 Executor.newCachedThreadPool()使用了這個佇列。

    • PriorityBlokingQueue:一個支援優先順序的無界阻塞佇列。使用該佇列,執行緒池中能建立的最大執行緒數為corePoolSize。

  6. threadFactory => 執行緒池建立執行緒使用的工廠

  7. handler => 執行緒池對拒絕任務的處理策略,主要有4種類型的拒絕策略:

    • AbortPolicy:無法處理新任務時,直接丟擲異常,這是預設策略。

    • CallerRunsPolicy:用呼叫者所在的執行緒來執行任務。

    • DiscardOldestPolicy:丟棄阻塞佇列中最靠前的一個任務,並執行當前任務。

    • DiscardPolicy:直接丟棄任務。

3.execute()方法

  1. 如果當前執行的執行緒少於corePoolSize,則建立新的工作執行緒來執行任務(執行這一步驟需要獲取全域性鎖)。

  2. 如果當前執行的執行緒大於或等於corePoolSize,而且BlockingQueue未滿,則將任務加入到BlockingQueue中。

  3. 如果BlockingQueue已滿,而且當前執行的執行緒小於maximumPoolSize,則建立新的工作執行緒來執行任務(執行這一步驟需要獲取全域性鎖)。

  4. 如果當前執行的執行緒大於或等於maximumPoolSize,任務將被拒絕,並呼叫RejectExecutionHandler.rejectExecution()方法。即呼叫飽和策略對任務進行處理。

3、執行緒池的工作流程

執行邏輯說明:

  1. 判斷核心執行緒數是否已滿,核心執行緒數大小和corePoolSize引數有關,未滿則建立執行緒執行任務

  2. 若核心執行緒池已滿,判斷佇列是否滿,佇列是否滿和workQueue引數有關,若未滿則加入佇列中

  3. 若佇列已滿,判斷執行緒池是否已滿,執行緒池是否已滿和maximumPoolSize引數有關,若未滿建立執行緒執行任務

  4. 若執行緒池已滿,則採用拒絕策略處理無法執執行的任務,拒絕策略和handler引數有關

4、Executors建立返回ThreadPoolExecutor物件(不推薦)

Executors建立返回ThreadPoolExecutor物件的方法共有三種:

1. Executors#newCachedThreadPool => 建立可快取的執行緒池

  • corePoolSize => 0,核心執行緒池的數量為0

  • maximumPoolSize => Integer.MAX_VALUE,可以認為最大執行緒數是無限的

  • keepAliveTime => 60L

  • unit => 秒

  • workQueue => SynchronousQueue

弊端:maximumPoolSize => Integer.MAX_VALUE可能會導致OOM

2. Executors#newSingleThreadExecutor => 建立單執行緒的執行緒池

SingleThreadExecutor是單執行緒執行緒池,只有一個核心執行緒:

  • corePoolSize => 1,核心執行緒池的數量為1

  • maximumPoolSize => 1,只可以建立一個非核心執行緒

  • keepAliveTime => 0L

  • unit => 毫秒

  • workQueue => LinkedBlockingQueue

弊端:LinkedBlockingQueue是長度為Integer.MAX_VALUE的佇列,可以認為是無界佇列,因此往佇列中可以插入無限多的任務,在資源有限的時候容易引起OOM異常

3. Executors#newFixedThreadPool => 建立固定長度的執行緒池

  • corePoolSize => 1,核心執行緒池的數量為1

  • maximumPoolSize => 1,只可以建立一個非核心執行緒

  • keepAliveTime => 0L

  • unit => 毫秒

  • workQueue => LinkedBlockingQueue

它和SingleThreadExecutor類似,唯一的區別就是核心執行緒數不同,並且由於使用的是LinkedBlockingQueue,在資源有限的時候容易引起OOM異常

5、執行緒池的合理配置

從以下幾個角度分析任務的特性:

  1. 任務的性質:CPU 密集型任務、IO 密集型任務和混合型任務。

  2. 任務的優先順序:高、中、低。

  3. 任務的執行時間:長、中、短。

  4. 任務的依賴性:是否依賴其他系統資源,如資料庫連線。

任務性質不同的任務可以用不同規模的執行緒池分開處理。可以通過 Runtime.getRuntime().availableProcessors()方法獲得當前裝置的 CPU 個數。

  • CPU 密集型任務:配置儘可能小的執行緒,如配置 cpu核心數+1 個執行緒的執行緒池。

  • IO 密集型任務 :由於執行緒並不是一直在執行任務,則配置儘可能多的執行緒,如2 ∗ Ncpu

  • 混合型任務:如果可以拆分,則將其拆分成一個 CPU 密集型任務和一個 IO 密集型任務。只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於序列執行的吞吐率;如果這兩個任務執行時間相差太大,則沒必要進行分解。

優先順序不同的任務可以使用優先順序佇列 PriorityBlockingQueue 來處理,它可以讓優先順序高的任務先得到執行。但是,如果一直有高優先順序的任務加入到阻塞佇列中,那麼低優先順序的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的執行緒池來處理,或者也可以使用優先順序佇列,讓執行時間短的任務先執行。

依賴資料庫連線池的任務,因為執行緒提交 SQL 後需要等待資料庫返回結果,執行緒數應該設定得較大,這樣才能更好的利用 CPU。

建議使用有界佇列,有界佇列能增加系統的穩定性和預警能力。可以根據需要設大一點,比如幾千。使用無界佇列,執行緒池的佇列就會越來越大,有可能會撐滿記憶體,導致整個系統不可用。

處理拒絕策略有以下幾種比較推薦:

在程式中捕獲RejectedExecutionException異常,在捕獲異常中對任務進行處理。針對預設拒絕策略使用CallerRunsPolicy拒絕策略,該策略會將任務交給呼叫execute的執行緒執行【一般為主執行緒】,此時主執行緒將在一段時間內不能提交任何任務,從而使工作執行緒處理正在執行的任務。此時提交的執行緒將被儲存在TCP佇列中,TCP佇列滿將會影響客戶端,這是一種平緩的效能降低自定義拒絕策略,只需要實現RejectedExecutionHandler介面即可如果任務不是特別重要,使用DiscardPolicy和DiscardOldestPolicy拒絕策略將任務丟棄也是可以的如果使用Executors的靜態方法建立ThreadPoolExecutor物件,可以通過使用Semaphore對任務的執行進行限流也可以避免出現OOM異常。

6、拒絕策略

有以下幾種比較推薦:

  • 在程式中捕獲RejectedExecutionException異常,在捕獲異常中對任務進行處理。針對預設拒絕策略

  • 使用CallerRunsPolicy拒絕策略,該策略會將任務交給呼叫execute的執行緒執行【一般為主執行緒】,此時主執行緒將在一段時間內不能提交任何任務,從而使工作執行緒處理正在執行的任務。此時提交的執行緒將被儲存在TCP佇列中,TCP佇列滿將會影響客戶端,這是一種平緩的效能降低

  • 自定義拒絕策略,只需要實現RejectedExecutionHandler介面即可

  • 如果任務不是特別重要,使用DiscardPolicy和DiscardOldestPolicy拒絕策略將任務丟棄也是可以的如果使用Executors的靜態方法建立ThreadPoolExecutor物件,可以通過使用Semaphore對任務的執行進行限流也可以避免出現OOM異常

7、執行緒池的五種執行狀態

執行緒狀態:

不同於執行緒狀態,執行緒池也有如下幾種 狀態:

RUNNING :該狀態的執行緒池既能接受新提交的任務,又能處理阻塞佇列中任務。

SHUTDOWN:該狀態的執行緒池不能接收新提交的任務,但是能處理阻塞佇列中的任務。(政府服務大廳不在允許群眾拿號了,處理完手頭的和排隊的政務就下班)


處於 RUNNING 狀態時,呼叫 shutdown()方法會使執行緒池進入到該狀態。

注意:finalize() 方法在執行過程中也會隱式呼叫shutdown()方法。

複製程式碼

STOP:該狀態的執行緒池不接受新提交的任務,也不處理在阻塞佇列中的任務,還會中斷正在執行的任務。(政府服務大廳不再進行服務了,拿號、排隊、以及手頭工作都停止了。)


線上程池處於 RUNNING 或 SHUTDOWN 狀態時,呼叫shutdownNow() 方法會使執行緒池進入到該狀態;

複製程式碼

TIDYING:如果所有的任務都已終止,workerCount (有效執行緒數)=0。


執行緒池進入該狀態後會呼叫 terminated() 鉤子方法進入TERMINATED 狀態。

複製程式碼

TERMINATED:在terminated()鉤子方法執行完後進入該狀態,預設terminated()鉤子方法中什麼也沒有做。

最後

如果你覺得此文對你有一丁點幫助,點個贊。或者可以加入我的開發交流群:1025263163相互學習,我們會有專業的技術答疑解惑

如果你覺得這篇文章對你有點用的話,麻煩請給我們的開源專案點點star:http://github.crmeb.net/u/defu不勝感激 !

PHP學習手冊:http://doc.crmeb.com
技術交流論壇:http://q.crmeb.com