執行緒池底層原理詳解與原始碼分析
【1】為什麼要使用執行緒池?
示例演示:
//設定業務模擬 class MyRunnable implements Runnable { private int count; public MyRunnable(int count) { this.count = count; } public int getCount() { return count; } @Override public void run() { for (int i = 0; i < 100000; i++) { count += i; } System.out.println("結果:"+count); } } //模擬執行緒池複用執行緒執行業務 public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; ExecutorService executorService = Executors.newSingleThreadExecutor(); MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { executorService.execute(myRunnable); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("時間:"+(System.currentTimeMillis() - start)); } //模擬每次執行業務都開一個執行緒 public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { Thread thread = new Thread(myRunnable); thread.start(); thread.join(); } System.out.println("時間:" + (System.currentTimeMillis() - start)); }
示例結果:
採用每次都開一個執行緒的結果是292毫秒,而執行緒池的是69毫秒。(隨著業務次數的增多這個數值的差距會越大)
示例說明:
如果每個請求到達就建立一個新執行緒,開銷是相當大的。在實際使用中, 伺服器在建立和銷燬執行緒上花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的使用者請求的時間和資源要多的多 。除了建立和銷燬執行緒的開銷之外,活動的執行緒也需要消耗系統資源。
如果併發的請求數量非常多,但每個執行緒執行的時間很短,這樣就會頻繁的建立和銷燬執行緒,如此一來會大大降低系統的效率。可能出現伺服器在為每個請求建立新執行緒和銷燬執行緒上花費的時間和消耗的系統資源要比處理實際的使用者請求的時間和資源更多 。(說明了我們什麼時候使用執行緒池:1.單個任務處理時間比較短;2.需要處理的任務數量很大;)
執行緒池主要用來解決執行緒生命週期開銷問題和資源不足問題。 通過對多個任務重複使用執行緒,執行緒建立的開銷就被分攤到了多個任務上了,而且由於在請求到達時執行緒已經存在,所以消除了執行緒建立所帶來的延遲 。這樣,就可以立即為請求服務,使用應用程式響應更快。另外, 通過適當的調整執行緒中的執行緒數目可以防止出現資源不足的情況 。
【2】執行緒池的介紹
(1) 執行緒池優勢
1. 重用存在的執行緒,減少執行緒建立,消亡的開銷,提高效能
2.提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。
3.提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。
(2)常見執行緒池
1. newSingleThreadExecutor : 單個執行緒的執行緒池,即執行緒池中每次只有一個執行緒工作,單執行緒序列執行任務
2. newFixedThreadExecutor (n) : 固定數量的執行緒池,每提交一個任務就是一個執行緒,直到達到執行緒池的最大數量,然後後面進入等待佇列直到前面的任務完成才繼續執行
3. newCacheThreadExecutor (推薦使用) : 可快取執行緒池, 當執行緒池大小超過了處理任務所需的執行緒,那麼就會回收部分空閒(一般是60秒無執行)的執行緒,當有任務來時,又智慧的新增新執行緒來執行。
4. newScheduleThreadExecutor : 大小無限制的執行緒池,支援定時和週期性的執行執行緒
5.常見執行緒池的說明
在阿里的開發手冊中其實不推薦我們使用預設的執行緒池,為什麼?
【1】Executors 返回的執行緒池物件的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM 。
2)CachedThreadPool 和 ScheduledThreadPool:
允許的建立執行緒數量為 Integer.MAX_VALUE,可能會建立大量的執行緒,從而導致 OOM 。
【2】其次newCacheThreadExecutor,沒有核心執行緒數,且非核心執行緒數是最大值,不斷建立執行緒容易出現CPU100%的問題。
(3)預設執行緒池
1. ThreadPoolExecutor
1)說明
實際上不管是newSingleThreadExecutor,newFixedThreadExecutor還是newCacheThreadExecutor,他們都是使用ThreadPoolExecutor去生成的。
只不過由於引數不同導致產生的執行緒池的不同,因此,我們常使用是ThreadPoolExecutor去自建自己想要的執行緒池。
2)引數解析
1. corePoolSize
執行緒池中的核心執行緒數,當提交一個任務時,執行緒池建立一個新執行緒執行任務,直到當前執行緒數等於corePoolSize;如果當前執行緒數為corePoolSize,繼續提交的任務被儲存到 阻塞佇列中,等待被執行;如果執行了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有核心執行緒。
2. maximumPoolSize
執行緒池中允許的最大執行緒數。如果當前阻塞佇列滿了,且繼續提交任務,則建立新的執行緒執行任務,前提是當前執行緒數小於maximumPoolSize;
3. keepAliveTime
執行緒池維護執行緒所允許的空閒時間。當執行緒池中的執行緒數量大於corePoolSize的時候,如果這時沒有新的任務提交,核心執行緒外的執行緒不會立即銷燬,而是會等待,直到等待的時間超過了keepAliveTime;
4. unit
keepAliveTime的單位;
5. workQueue
用來儲存等待被執行的任務的阻塞佇列,且任務必須實現Runable介面,在JDK中提供瞭如下阻塞佇列:
1、 ArrayBlockingQueue :基於陣列結構的有界阻塞佇列,按FIFO排序任務;
2、 LinkedBlockingQuene :基於連結串列結構的阻塞佇列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQuene;
3、 SynchronousQuene :一個不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;
4、 priorityBlockingQuene :具有優先順序的無界阻塞佇列;
6. threadFactory
它是ThreadFactory型別的變數,用來建立新執行緒。預設使用Executors.defaultThreadFactory() 來建立執行緒。使用預設的ThreadFactory來建立執行緒時,會使新建立的執行緒具有相同的NORM_PRIORITY優先順序並且是非守護執行緒,同時也設定了執行緒的名稱。
7. handler
執行緒池的飽和策略,當阻塞佇列滿了,且沒有空閒的工作執行緒,如果繼續提交任務,必須採取一種策略處理該任務,執行緒池提供了 4種策略 :
1、 AbortPolicy :直接丟擲異常,預設策略;
2、 CallerRunsPolicy :用呼叫者所在的執行緒來執行任務;
3、 DiscardOldestPolicy :丟棄阻塞佇列中靠最前的任務,並執行當前任務;
4、 DiscardPolicy :直接丟棄任務;
上面的4種策略都是ThreadPoolExecutor的內部類。
當然也可以 根據應用場景實現 RejectedExecutionHandler 介面,自定義飽和策略,如記錄日誌或持久化儲存不能處理的任務 。( 自定義的才是最常用的 )
【3】執行緒池相關的類分析
1.ExecutorService介面與Executor介面
//定義了一個用於執行Runnable的execute方法 public interface Executor { void execute(Runnable command); } /** * 介面ExecutorService,其中定義了執行緒池的具體行為 * 1,execute(Runnable command):履行Ruannable型別的任務, * 2,submit(task):可用來提交Callable或Runnable任務,並返回代表此任務的Future 物件 * 3,shutdown():在完成已提交的任務後封閉辦事,不再接管新任務, * 4,shutdownNow():停止所有正在履行的任務並封閉辦事。 * 5,isTerminated():測試是否所有任務都履行完畢了。 * 6,isShutdown():測試是否該ExecutorService已被關閉。 */ public interface ExecutorService extends Executor { // 停止執行緒池 void shutdown(); // 立即停止執行緒池,返回尚未執行的任務列表 List<Runnable> shutdownNow(); // 執行緒池是否停止 boolean isShutdown(); // 執行緒池是否終結 boolean isTerminated(); // 等待執行緒池終結 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交Callable型別任務 <T> Future<T> submit(Callable<T> task); // 提交Runnable型別任務,預先知道返回值 <T> Future<T> submit(Runnable task, T result); // 提交Runnable型別任務,對返回值無感知 Future<?> submit(Runnable task); // 永久阻塞 - 提交和執行一個任務列表的所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 帶超時阻塞 - 提交和執行一個任務列表的所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 永久阻塞 - 提交和執行一個任務列表的某一個任務 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 帶超時阻塞 - 提交和執行一個任務列表的某一個任務 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2.抽象類AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } .... }
3.ThreadPoolExecutor類
public class ThreadPoolExecutor extends AbstractExecutorService { ... public void execute(Runnable command) { if (command == null) int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } ... }
4.ScheduledThreadPoolExecutor類
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { ... public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } ... }
5.問題點
1)execute方法與submit方法的區別?
【1】最明顯的就是 :
void execute() //提交任務無返回值
Future<?> submit() //任務執行完成後有返回值
【2】另外一個不明顯的就是佇列的提交方法(add【ScheduledThreadPoolExecutor類中使用】與offer【ThreadPoolExecutor類中使用】)
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
明顯當佇列滿了的時候,add方法會丟擲異常,而offer不會 。
【4】執行緒池的狀態分析
1.執行緒池存在5種狀態
1)RUNNING = ‐1 << COUNT_BITS; //高3位為111 執行狀態
2)SHUTDOWN = 0 << COUNT_BITS; //高3位為000 關閉狀態
3)STOP = 1 << COUNT_BITS; //高3位為001 停止狀態
4)TIDYING = 2 << COUNT_BITS; //高3位為010 整理狀態
5)TERMINATED = 3 << COUNT_BITS; //高3位為011 銷燬狀態
2.狀態說明
1、RUNNING
(1) 狀態說明:執行緒池處在RUNNING狀態時,能夠接收新任務,以及對已新增的任務進行處理。
(02) 狀態切換:執行緒池的初始化狀態是RUNNING。換句話說,執行緒池被一旦被建立,就處於RUNNING狀態,並且執行緒池中的任務數為0!
2、 SHUTDOWN
(1)狀態說明:執行緒池處在SHUTDOWN狀態時,不接收新任務,但能處理已新增的任務。
(2)狀態切換:呼叫執行緒池的shutdown()介面時,執行緒池由RUNNING -> SHUTDOWN。
3、STOP
(1)狀態說明:執行緒池處在STOP狀態時,不接收新任務,不處理已新增的任務,並且會中斷正在處理的任務。
(2)狀態切換:呼叫執行緒池的shutdownNow()介面時,執行緒池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1)狀態說明:當所有的任務已終止,ctl記錄的”任務數量”為0,執行緒池會變為TIDYING 狀態。當執行緒池變為TIDYING狀態時,會執行鉤子函式terminated()。terminated()在 ThreadPoolExecutor類中是空的,若使用者想線上程池變為TIDYING時,進行相應的處理; 可以通過過載terminated()函式來實現。
(2)狀態切換:當執行緒池在SHUTDOWN狀態下,阻塞佇列為空並且執行緒池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。 當執行緒池在STOP狀態下,執行緒池中執行的任務為空時,就會由STOP -> TIDYING。
5、 TERMINATED
(1)狀態說明:執行緒池徹底終止,就變成TERMINATED狀態。
(2)狀態切換:執行緒池處在TIDYING狀態時,執行完terminated()之後,就會由 TIDYING -> TERMINATED。
進入TERMINATED的條件如下:
執行緒池不是RUNNING狀態;
執行緒池狀態不是TIDYING狀態或TERMINATED狀態;
如果執行緒池狀態是SHUTDOWN並且workerQueue為空;
workerCount為0;
設定TIDYING狀態成功。
3.彙總
預設情況下,如果不呼叫關閉方法,執行緒池會一直處於 RUNNING 狀態,而執行緒池狀態的轉移有兩個路徑:當呼叫 shutdown() 方法時,執行緒池的狀態會從 RUNNING 到 SHUTDOWN,再到 TIDYING,最後到 TERMENATED 銷燬狀態;當呼叫 shutdownNow() 方法時,執行緒池的狀態會從 RUNNING 到 STOP,再到 TIDYING,最後到 TERMENATED 銷燬狀態。
4.圖示
【5】執行緒池的原始碼解析
1.針對自定義執行緒池的執行分析
1)示例程式碼:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));//自定義執行緒 for (int i = 1; i <= 100; i++) { threadPoolExecutor.execute(new MyTask(i)); }
2)示例結果:
3)示例疑問:
輸出的順序並不是預想的1-5,6-10,11-15,16-20。反而是1-5,16-20,6-10,11-15。( 深入原始碼查探原因 )
2.針對自定義執行緒池ThreadPoolExecutor類的執行分析
1)ThreadPoolExecutor類重要屬性 private final AtomicInteger ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //預設值-536870912 private static final int COUNT_BITS = Integer.SIZE - 3; //預設值29,轉為2進位制11101 private static final int CAPACITY = (1 << COUNT_BITS)-1; //預設值536870911,轉為2進位制11111111111111111111111111111 private static final int RUNNING = -1 << COUNT_BITS; //-536870912 private static final int SHUTDOWN = 0 << COUNT_BITS; //0 private static final int STOP = 1 << COUNT_BITS; //536870912 private static final int TIDYING = 2 << COUNT_BITS; //1073741824 private static final int TERMINATED = 3 << COUNT_BITS; //1610612736 //ctl相關方法 private static int runStateOf(int c) { return c & ~CAPACITY; } //runStateOf:獲取執行狀態;//~x=-(x+1) //預設值0 private static int workerCountOf(int c) { return c & CAPACITY; } //workerCountOf:獲取活動執行緒數; //預設值0,當執行緒數+1是值也會+1 private static int ctlOf(int rs, int wc) { return rs | wc; } //ctlOf:獲取執行狀態和活動執行緒數的值。//預設值-536870912 說明: ctl 是對執行緒池的執行狀態和執行緒池中有效執行緒的數量進行控制的一個欄位, 它包含兩部分的資訊: 執行緒池的執行狀態 (runState) 和執行緒池內有效執行緒的數量 (workerCount), 可以看到,使用了Integer型別來儲存,高3位儲存runState,低29位儲存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。 PS: 1.&和&&的區別 相同點: 最終得到的boolean值結果一樣,都是“並且and”的意思 不同點: &既是邏輯運算子也是位運算子;&&只是邏輯運算子 &不具有短路效果,即左邊false,右邊還會執行;&&具有短路效果,左邊為false,右邊則不執行 2.| 和 || 的區別 相同點: 最終得到的boolean值結果一樣,都是“或者or”的意思 不同點: | 既是邏輯運算子也是位運算子;|| 只是邏輯運算子 | 不具有短路效果,即左邊true,右邊還會執行;|| 具有短路效果,左邊為true,右邊則不執行
2)ThreadPoolExecutor類#execute方法【這裡涉及到一個概念, 提交優先順序 : 核心執行緒>佇列>非核心執行緒】
展示
public void execute(Runnable command) { if (command == null) //不能提交空任務 throw new NullPointerException(); int c = ctl.get(); //獲取執行的執行緒數 //核心執行緒數不滿 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //在addWorker中建立工作執行緒執行任務 return; c = ctl.get(); } //執行緒還在執行,且核心數滿了,放入執行緒池佇列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//執行緒池是否處於執行狀態,如果不是,則剛塞入的任務要移除 reject(command); //走拒絕策略 //這一步其實沒有很大意義,除非出現執行緒池所有執行緒完蛋了,但是佇列還有任務的情況。(一般是進入時時執行態,然後遇到狀態變更的情況) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //插入佇列不成功,且當前執行緒數數量小於最大執行緒池數量,此時則建立新執行緒執行任務,建立失敗丟擲異常 else if (!addWorker(command, false)) reject(command); //走拒絕策略 }
說明
在正常執行狀態下,執行緒池:核心執行緒執行任務-》塞入佇列-》非核心執行緒執行任務。
體現了在併發不激烈的情況下,儘量減少建立執行緒的操作,用已有的執行緒。而且核心執行緒數並不是提前建立的,而是用到的時候才會建立。而且核心執行緒數不滿,優先以建立執行緒來執行任務。
邏輯展示
3)ThreadPoolExecutor類#addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取執行緒池的狀態 int c = ctl.get(); int rs = runStateOf(c); //如果是非執行狀態(因為只有執行狀態是負數) if (rs >= SHUTDOWN && ! //判斷是不是關閉狀態,不接收新任務,但能處理已新增的任務 //任務是不是空任務,佇列是不是空(這一步說明了關閉狀態不接受任務) (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取活動執行緒數 int wc = workerCountOf(c); //檢驗執行緒數是否大於容量值【這是避免設定的非核心執行緒數沒有限制大小】 //根據傳入引數判斷核心執行緒數與非核心執行緒數是否達到了最大值 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //嘗試增加workerCount數量【也就是活躍執行緒數+1】,如果成功,則跳出第一個for迴圈 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失敗,則重新獲取ctl的值 c = ctl.get(); // 如果當前的執行狀態不等於rs,說明狀態已被改變,返回第一個for迴圈繼續執行 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; //執行緒啟動標誌 boolean workerAdded = false; //執行緒新增標誌 Worker w = null; try { //根據firstTask來建立Worker物件,每一個Worker物件都會建立一個執行緒 w = new Worker(firstTask); //【呼叫1】 final Thread t = w.thread; //如果過執行緒不為空,則試著將執行緒加入工作佇列中 if (t != null) { final ReentrantLock mainLock = this.mainLock; //加重入鎖 mainLock.lock(); try { // 重新獲取執行緒的狀態 int rs = runStateOf(ctl.get()); //是否執行緒池正處於執行狀態 if (rs < SHUTDOWN || //執行緒池是否處於關閉狀態 且 傳入的任務為空(說明關閉狀態還是能新增工作者,但是不允許新增任務) (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //判斷執行緒是否存活 throw new IllegalThreadStateException(); //workers是一個HashSet,將該worker物件新增其中 workers.add(w); //記錄執行緒工作者的值 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //修改新增標記 workerAdded = true; } } finally { //解鎖 mainLock.unlock(); } //如果新增成功,則啟動執行緒 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//【呼叫2】 } return workerStarted; } //呼叫1 Worker(Runnable firstTask) { setState(-1); // 建立時不允許中斷 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //呼叫2:新增工作者失敗方法 private void addWorkerFailed(Worker w) { //加重入鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //移除工作者 if (w != null) workers.remove(w); //任務數量減一 decrementWorkerCount(); //進入整理狀態() tryTerminate(); } finally { mainLock.unlock(); } }
說明
Worker繼承了AQS,使用AQS來實現獨佔鎖的功能。為什麼不使用ReentrantLock來實現呢?
可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:
1)lock方法一旦獲取了獨佔鎖,表示當前執行緒正在執行任務中;
2)如果正在執行任務,則不應該中斷執行緒;
3)如果該執行緒現在不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時可以對該執行緒進行中斷;
4)執行緒池在執行shutdown方法或tryTerminate方法時會呼叫interruptIdleWorkers方法來中斷空閒的執行緒,interruptIdleWorkers方法會使用tryLock方法來判斷執行緒池中的執行緒是否是空閒狀態;
5)之所以設定為不可重入,是因為我們不希望任務在呼叫像setCorePoolSize這樣的執行緒池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果在任務中呼叫瞭如setCorePoolSize這類執行緒池控制的方法,會中斷正在執行的執行緒。
所以,Worker繼承自AQS( AbstractQueuedSynchronizer類 ),用於判斷執行緒是否空閒以及是否可以被中斷。
此外,在構造方法中執行了setState(-1);,把state變數設定為-1,為什麼這麼做呢?是因為AQS中預設的state是0,如果剛建立了一個Worker物件,還沒有執行任務時,這時就不應該被中斷。tryAcquire方法是根據state是否是0來判斷的,所以,setState(-1);將state設定為-1是為了禁止在執行任務前對執行緒進行中斷。正因為如此,在runWorker方法中會先呼叫Worker物件的unlock方法將state設定為0.
4)ThreadPoolExecutor類#runWorker方法【這裡有涉及到一個概念, 執行優先順序 : 核心執行緒>非核心執行緒>佇列】
程式碼展示
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //取出任務 w.firstTask = null; //將工作者持有任務清空 w.unlock(); //將執行緒置為可中斷,因為建立時候設定不可中斷 boolean completedAbruptly = true; // 是否因為異常退出迴圈 try { //當沒有任務的時候,優先從佇列裡面獲取(自旋方式) while (task != null || (task = getTask()) != null) { w.lock(); //如果執行緒池正在停止,那麼要保證當前執行緒是中斷狀態; if ((runStateAtLeast(ctl.get(), STOP) || // 如果不是的話,則要保證當前執行緒不是中斷狀態;(這裡要考慮在執行該if語句期間可能也執行了shutdownNow方法,shutdownNow方法會把狀態設定為STOP。STOP狀態要中斷執行緒池中的所有執行緒,而這裡使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態時執行緒是非中斷狀態的,因為Thread.interrupted()方法會復位中斷的狀態。) (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //如果不是中斷狀態,則呼叫task.run()執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //completedAbruptly變數來表示在執行任務過程中是否出現了異常,在processWorkerExit方法中會對該變數的值進行判斷。 processWorkerExit(w, completedAbruptly); } }
彙總說明
總結一下runWorker方法的執行過程:
1)while迴圈不斷地通過getTask()方法獲取任務;
2)getTask()方法從阻塞佇列中取任務;
3)如果執行緒池正在停止,那麼要保證當前執行緒是中斷狀態,否則要保證當前執行緒不是中斷狀態;呼叫task.run()執行任務;
4)如果task為null則跳出迴圈,執行processWorkerExit()方法;
5)ThreadPoolExecutor類#getTask方法
程式碼展示
private Runnable getTask() { // timeOut變數的值表示上次從阻塞佇列中取任務時是否超時 boolean timedOut = false; for (;;) { //獲取執行緒池狀態 int c = ctl.get(); int rs = runStateOf(c); //是否是非執行狀態(因為如果當前執行緒池狀態的值是SHUTDOWN或以上時,不允許再向阻塞佇列中新增任務。) //如果是非執行狀態:是不是STOP,TIDYING,TERMINATED ,三種狀態之一;或者佇列為空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //如果以上條件滿足,則將workerCount減1並返回null decrementWorkerCount(); //CAS執行緒數減一 return null; } //重新獲取執行緒的數量 int wc = workerCountOf(c); //allowCoreThreadTimeOut預設是false,也就是核心執行緒不允許進行超時; //wc > corePoolSize,表示當前執行緒池中的執行緒數量大於核心執行緒數量;對於超過核心執行緒數量的這些執行緒,需要進行超時控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//有非核心執行緒必定為true //wc > maximumPoolSize的情況是因為可能在此方法執行階段同時執行了setMaximumPoolSize方法; //timed && timedOut 如果為true,表示當前操作需要進行超時控制,並且上次從阻塞佇列中獲取任務發生了超時。 if ((wc > maximumPoolSize || (timed && timedOut)) //接下來判斷,如果有效執行緒數量大於1,或者阻塞佇列是空的,那麼嘗試將workerCount減1; //如果wc == 1時,也就說明當前執行緒是執行緒池中唯一的一個執行緒了。 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; //如果減1失敗,則返回重試。 continue; } try { //根據timed來判斷,如果為true(大概率是有非核心執行緒),則通過阻塞佇列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則返回null; //否則通過take方法,如果這時佇列為空,則take方法會阻塞直到佇列不為空。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; //判斷任務超時 } catch (InterruptedException retry) { // 如果獲取任務時當前執行緒發生了中斷,則設定timedOut為false並返回迴圈重試 timedOut = false; } } }
彙總說明
執行狀態下( 這種情況下會把超出核心執行緒數的部分進入回收,也有一定概率回收核心執行緒 ):
情況1:當有非核心執行緒數的時候,timed為true,導致呼叫poll方法,這時候如果沒有任務且超時,timedOut變為true,第二次進入自旋,timed還是true,進入判斷會走compareAndDecrementWorkerCount,執行緒數減一,並返回null。( 這種情況存在極端情況就是,全部執行緒走到同一邏輯去減,導致全部執行緒數都被減完了【即時有著wc > 1的判斷,因為多執行緒併發情況,你懂得】 )
情況2:沒有非核心執行緒數,timed為false,導致呼叫take方法,執行緒一致阻塞直至,拿到任務。(這時候不存在減少執行緒)
非執行狀態下( 這種情況下是執行緒都會進入回收 ):
情況3:如果執行緒狀態是STOP,TIDYING,TERMINATED,那麼呼叫decrementWorkerCount,執行緒數減一,返回null。
情況4:如果執行緒狀態是SHUTDOWN,佇列不為空,則繼續任務,如果佇列為空,那麼呼叫decrementWorkerCount,執行緒數減一,返回null。
所以,綜上所述, 非核心執行緒和核心執行緒其實都存在被回收的概率 。
6)ThreadPoolExecutor類#processWorkerExit方法
程式碼展示
//主要用於執行緒的清理工作 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值為true,則說明執行緒執行時出現了異常,需要將workerCount減1; // 如果執行緒執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount進行了減1操作,這裡就不必再減了。 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; //從workers中移除,也就表示著從執行緒池中移除了一個工作執行緒 workers.remove(w); } finally { mainLock.unlock(); } //根據執行緒池狀態進行判斷是否結束執行緒池 tryTerminate(); int c = ctl.get(); //當執行緒池是RUNNING或SHUTDOWN狀態時,如果worker是異常結束,那麼會直接addWorker;如果是其他三種,就不會去補Worker。 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果allowCoreThreadTimeOut=true(可設定),並且等待佇列有任務,至少保留一個worker; if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果allowCoreThreadTimeOut=false(預設值),workerCount不少於corePoolSize。【靠後面的addWorker】 if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
程式碼說明
通過設定allowCoreThreadTimeOut引數,我們可以選擇核心執行緒的回收,在不用的時候保留一個worker。(這種更適用於某時間段高併發,其餘時間段工作量不足的情況)
7)ThreadPoolExecutor類#tryTerminate方法
final void tryTerminate() { for (;;) { int c = ctl.get(); /** * 當前執行緒池的狀態為以下幾種情況時,直接返回: * 1. RUNNING,因為還在執行中,不能停止; * 2. TIDYING或TERMINATED,因為執行緒池中已經沒有正在執行的執行緒了; * 3. SHUTDOWN並且等待佇列非空,這時要執行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果執行緒數量不為0,則中斷一個空閒的工作執行緒,並返回 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 這裡嘗試設定狀態為TIDYING,如果設定成功,則呼叫terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 設定狀態為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
8)ThreadPoolExecutor類#shutdown方法
//shutdown方法要將執行緒池切換到SHUTDOWN狀態,並呼叫 interruptIdleWorkers方法請求中斷所有空閒的worker,最後呼叫tryTerminate嘗試結束執行緒池。 public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判斷 checkShutdownAccess(); // 切換狀態為SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閒執行緒 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 嘗試結束執行緒池 tryTerminate(); }
9)ThreadPoolExecutor類#interruptIdleWorkers方法
private void interruptIdleWorkers() { interruptIdleWorkers(false); } //interruptIdleWorkers遍歷workers中所有的工作執行緒,若執行緒沒有被中斷tryLock成功,就中斷該執行緒。 //為什麼需要持有mainLock?因為workers是HashSet型別的,不能保證執行緒安全。private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
10)ThreadPoolExecutor類#hutdownNow方法
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中斷所有工作執行緒,無論是否空閒 interruptWorkers(); // 取出佇列中沒有被執行的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
11)問題思考
1. 在runWorker方法中,執行任務時對Worker物件w進行了lock操作,為什麼要在執行任務的時候對每個工作執行緒都加鎖呢 ?
(1)在getTask方法中,如果這時執行緒池的狀態是SHUTDOWN並且workQueue為空,那麼就應該返回null來結束這個工作執行緒,而使執行緒池進入SHUTDOWN狀態需要呼叫shutdown方法;
(2)shutdown方法會呼叫interruptIdleWorkers來中斷空閒的執行緒,interruptIdleWorkers持有mainLock,會遍歷workers來逐個判斷工作執行緒是否空閒。但getTask方法中沒有mainLock;
(3)在getTask中,如果判斷當前執行緒池狀態是RUNNING,並且阻塞佇列為空,那麼會呼叫workQueue.take()進行阻塞;
(4)如果在判斷當前執行緒池狀態是RUNNING後,這時呼叫了shutdown方法把狀態改為了SHUTDOWN,這時如果不進行中斷,那麼當前的工作執行緒在呼叫了workQueue.take()後會一直阻塞而不會被銷燬,因為在SHUTDOWN狀態下不允許再有新的任務新增到workQueue中,這樣一來執行緒池永遠都關閉不了;
(5)由上可知,shutdown方法與getTask方法(從佇列中獲取任務時)存在競態條件;
(6)解決這一問題就需要用到執行緒的中斷,也就是為什麼要用interruptIdleWorkers方法。在呼叫workQueue.take()時,如果發現當前執行緒在執行之前或者執行期間是中斷狀態,則會丟擲InterruptedException,解除阻塞的狀態;
(7)但是要中斷工作執行緒,還要判斷工作執行緒是否是空閒的,如果工作執行緒正在處理任務,就不應該發生中斷;
(8)所以Worker繼承自AQS,在工作執行緒處理任務時會進行lock,interruptIdleWorkers在進行中斷時會使用tryLock來判斷該工作執行緒是否正在處理任務,如果tryLock返回true,說明該工作執行緒當前未執行任務,這時才可以被中斷。
【6】額外拓展
(1)有關阻塞佇列部分(可檢視 java原生阻塞佇列詳解索引 )
(2)有關 Future和Callable的部分(可檢視針對Future部分的詳解)
tryTerminate
- 執行緒池底層原理詳解與原始碼分析
- 30分鐘掌握 Webpack
- 線性迴歸大結局(嶺(Ridge)、 Lasso迴歸原理、公式推導),你想要的這裡都有
- 【前端必會】webpack loader 到底是什麼
- 中心化決議管理——雲端分析
- HashMap底層原理及jdk1.8原始碼解讀
- 詳解JS中 call 方法的實現
- 列印 Logger 日誌時,需不需要再封裝一下工具類?
- 初識設計模式 - 代理模式
- 密碼學奇妙之旅、01 CFB密文反饋模式、AES標準、Golang程式碼
- Springboot之 Mybatis 多資料來源實現
- CAS核心思想、底層實現
- 面試突擊86:SpringBoot 事務不回滾?怎麼解決?
- 基於electron vue element構建專案模板之【打包篇】
- MiniWord .NET Word模板引擎,藉由Word模板和資料簡單、快速生成檔案。
- 認識執行緒,初始併發
- 1-VSCode搭建GD32開發環境
- 初識設計模式 - 原型模式
- 執行緒安全問題的產生條件、解決方式
- 2>&1到底是什麼意思?