線程池底層原理詳解與源碼分析
【1】為什麼要使用線程池?
示例演示:
//設置業務模擬 class MyRunnable implements Runnable { private int count; public MyRunnable(int count) { this.count = count; } public int getCount() { return count; } @Override public void run() { for (int i = 0; i < 100000; i++) { count += i; } System.out.println("結果:"+count); } } //模擬線程池複用線程執行業務 public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; ExecutorService executorService = Executors.newSingleThreadExecutor(); MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { executorService.execute(myRunnable); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("時間:"+(System.currentTimeMillis() - start)); } //模擬每次執行業務都開一個線程 public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { Thread thread = new Thread(myRunnable); thread.start(); thread.join(); } System.out.println("時間:" + (System.currentTimeMillis() - start)); }
示例結果:
採用每次都開一個線程的結果是292毫秒,而線程池的是69毫秒。(隨着業務次數的增多這個數值的差距會越大)
示例説明:
如果每個請求到達就創建一個新線程,開銷是相當大的。在實際使用中, 服務器在創建和銷燬線程上花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的用户請求的時間和資源要多的多 。除了創建和銷燬線程的開銷之外,活動的線程也需要消耗系統資源。
如果併發的請求數量非常多,但每個線程執行的時間很短,這樣就會頻繁的創建和銷燬線程,如此一來會大大降低系統的效率。可能出現服務器在為每個請求創建新線程和銷燬線程上花費的時間和消耗的系統資源要比處理實際的用户請求的時間和資源更多 。(説明了我們什麼時候使用線程池:1.單個任務處理時間比較短;2.需要處理的任務數量很大;)
線程池主要用來解決線程生命週期開銷問題和資源不足問題。 通過對多個任務重複使用線程,線程創建的開銷就被分攤到了多個任務上了,而且由於在請求到達時線程已經存在,所以消除了線程創建所帶來的延遲 。這樣,就可以立即為請求服務,使用應用程序響應更快。另外, 通過適當的調整線程中的線程數目可以防止出現資源不足的情況 。
【2】線程池的介紹
(1) 線程池優勢
1. 重用存在的線程,減少線程創建,消亡的開銷,提高性能
2.提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
3.提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。
(2)常見線程池
1. newSingleThreadExecutor : 單個線程的線程池,即線程池中每次只有一個線程工作,單線程串行執行任務
2. newFixedThreadExecutor (n) : 固定數量的線程池,每提交一個任務就是一個線程,直到達到線程池的最大數量,然後後面進入等待隊列直到前面的任務完成才繼續執行
3. newCacheThreadExecutor (推薦使用) : 可緩存線程池, 當線程池大小超過了處理任務所需的線程,那麼就會回收部分空閒(一般是60秒無執行)的線程,當有任務來時,又智能的添加新線程來執行。
4. newScheduleThreadExecutor : 大小無限制的線程池,支持定時和週期性的執行線程
5.常見線程池的説明
在阿里的開發手冊中其實不推薦我們使用默認的線程池,為什麼?
【1】Executors 返回的線程池對象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM 。
2)CachedThreadPool 和 ScheduledThreadPool:
允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM 。
【2】其次newCacheThreadExecutor,沒有核心線程數,且非核心線程數是最大值,不斷創建線程容易出現CPU100%的問題。
(3)默認線程池
1. ThreadPoolExecutor
1)説明
實際上不管是newSingleThreadExecutor,newFixedThreadExecutor還是newCacheThreadExecutor,他們都是使用ThreadPoolExecutor去生成的。
只不過由於參數不同導致產生的線程池的不同,因此,我們常使用是ThreadPoolExecutor去自建自己想要的線程池。
2)參數解析
1. corePoolSize
線程池中的核心線程數,當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等於corePoolSize;如果當前線程數為corePoolSize,繼續提交的任務被保存到 阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有核心線程。
2. maximumPoolSize
線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小於maximumPoolSize;
3. keepAliveTime
線程池維護線程所允許的空閒時間。當線程池中的線程數量大於corePoolSize的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷燬,而是會等待,直到等待的時間超過了keepAliveTime;
4. unit
keepAliveTime的單位;
5. workQueue
用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable接口,在JDK中提供瞭如下阻塞隊列:
1、 ArrayBlockingQueue :基於數組結構的有界阻塞隊列,按FIFO排序任務;
2、 LinkedBlockingQuene :基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQuene;
3、 SynchronousQuene :一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;
4、 priorityBlockingQuene :具有優先級的無界阻塞隊列;
6. threadFactory
它是ThreadFactory類型的變量,用來創建新線程。默認使用Executors.defaultThreadFactory() 來創建線程。使用默認的ThreadFactory來創建線程時,會使新創建的線程具有相同的NORM_PRIORITY優先級並且是非守護線程,同時也設置了線程的名稱。
7. handler
線程池的飽和策略,當阻塞隊列滿了,且沒有空閒的工作線程,如果繼續提交任務,必須採取一種策略處理該任務,線程池提供了 4種策略 :
1、 AbortPolicy :直接拋出異常,默認策略;
2、 CallerRunsPolicy :用調用者所在的線程來執行任務;
3、 DiscardOldestPolicy :丟棄阻塞隊列中靠最前的任務,並執行當前任務;
4、 DiscardPolicy :直接丟棄任務;
上面的4種策略都是ThreadPoolExecutor的內部類。
當然也可以 根據應用場景實現 RejectedExecutionHandler 接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務 。( 自定義的才是最常用的 )
【3】線程池相關的類分析
1.ExecutorService接口與Executor接口
//定義了一個用於執行Runnable的execute方法 public interface Executor { void execute(Runnable command); } /** * 接口ExecutorService,其中定義了線程池的具體行為 * 1,execute(Runnable command):履行Ruannable類型的任務, * 2,submit(task):可用來提交Callable或Runnable任務,並返回代表此任務的Future 對象 * 3,shutdown():在完成已提交的任務後封閉辦事,不再接管新任務, * 4,shutdownNow():停止所有正在履行的任務並封閉辦事。 * 5,isTerminated():測試是否所有任務都履行完畢了。 * 6,isShutdown():測試是否該ExecutorService已被關閉。 */ public interface ExecutorService extends Executor { // 停止線程池 void shutdown(); // 立即停止線程池,返回尚未執行的任務列表 List<Runnable> shutdownNow(); // 線程池是否停止 boolean isShutdown(); // 線程池是否終結 boolean isTerminated(); // 等待線程池終結 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交Callable類型任務 <T> Future<T> submit(Callable<T> task); // 提交Runnable類型任務,預先知道返回值 <T> Future<T> submit(Runnable task, T result); // 提交Runnable類型任務,對返回值無感知 Future<?> submit(Runnable task); // 永久阻塞 - 提交和執行一個任務列表的所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 帶超時阻塞 - 提交和執行一個任務列表的所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 永久阻塞 - 提交和執行一個任務列表的某一個任務 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 帶超時阻塞 - 提交和執行一個任務列表的某一個任務 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2.抽象類AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } .... }
3.ThreadPoolExecutor類
public class ThreadPoolExecutor extends AbstractExecutorService { ... public void execute(Runnable command) { if (command == null) int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } ... }
4.ScheduledThreadPoolExecutor類
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { ... public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } ... }
5.問題點
1)execute方法與submit方法的區別?
【1】最明顯的就是 :
void execute() //提交任務無返回值
Future<?> submit() //任務執行完成後有返回值
【2】另外一個不明顯的就是隊列的提交方法(add【ScheduledThreadPoolExecutor類中使用】與offer【ThreadPoolExecutor類中使用】)
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
明顯當隊列滿了的時候,add方法會拋出異常,而offer不會 。
【4】線程池的狀態分析
1.線程池存在5種狀態
1)RUNNING = ‐1 << COUNT_BITS; //高3位為111 運行狀態
2)SHUTDOWN = 0 << COUNT_BITS; //高3位為000 關閉狀態
3)STOP = 1 << COUNT_BITS; //高3位為001 停止狀態
4)TIDYING = 2 << COUNT_BITS; //高3位為010 整理狀態
5)TERMINATED = 3 << COUNT_BITS; //高3位為011 銷燬狀態
2.狀態説明
1、RUNNING
(1) 狀態説明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(02) 狀態切換:線程池的初始化狀態是RUNNING。換句話説,線程池被一旦被創建,就處於RUNNING狀態,並且線程池中的任務數為0!
2、 SHUTDOWN
(1)狀態説明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(2)狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。
3、STOP
(1)狀態説明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
(2)狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1)狀態説明:當所有的任務已終止,ctl記錄的”任務數量”為0,線程池會變為TIDYING 狀態。當線程池變為TIDYING狀態時,會執行鈎子函數terminated()。terminated()在 ThreadPoolExecutor類中是空的,若用户想在線程池變為TIDYING時,進行相應的處理; 可以通過重載terminated()函數來實現。
(2)狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空並且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。
5、 TERMINATED
(1)狀態説明:線程池徹底終止,就變成TERMINATED狀態。
(2)狀態切換:線程池處在TIDYING狀態時,執行完terminated()之後,就會由 TIDYING -> TERMINATED。
進入TERMINATED的條件如下:
線程池不是RUNNING狀態;
線程池狀態不是TIDYING狀態或TERMINATED狀態;
如果線程池狀態是SHUTDOWN並且workerQueue為空;
workerCount為0;
設置TIDYING狀態成功。
3.彙總
默認情況下,如果不調用關閉方法,線程池會一直處於 RUNNING 狀態,而線程池狀態的轉移有兩個路徑:當調用 shutdown() 方法時,線程池的狀態會從 RUNNING 到 SHUTDOWN,再到 TIDYING,最後到 TERMENATED 銷燬狀態;當調用 shutdownNow() 方法時,線程池的狀態會從 RUNNING 到 STOP,再到 TIDYING,最後到 TERMENATED 銷燬狀態。
4.圖示
【5】線程池的源碼解析
1.針對自定義線程池的運行分析
1)示例代碼:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));//自定義線程 for (int i = 1; i <= 100; i++) { threadPoolExecutor.execute(new MyTask(i)); }
2)示例結果:
3)示例疑問:
輸出的順序並不是預想的1-5,6-10,11-15,16-20。反而是1-5,16-20,6-10,11-15。( 深入源碼查探原因 )
2.針對自定義線程池ThreadPoolExecutor類的運行分析
1)ThreadPoolExecutor類重要屬性 private final AtomicInteger ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //默認值-536870912 private static final int COUNT_BITS = Integer.SIZE - 3; //默認值29,轉為2進制11101 private static final int CAPACITY = (1 << COUNT_BITS)-1; //默認值536870911,轉為2進制11111111111111111111111111111 private static final int RUNNING = -1 << COUNT_BITS; //-536870912 private static final int SHUTDOWN = 0 << COUNT_BITS; //0 private static final int STOP = 1 << COUNT_BITS; //536870912 private static final int TIDYING = 2 << COUNT_BITS; //1073741824 private static final int TERMINATED = 3 << COUNT_BITS; //1610612736 //ctl相關方法 private static int runStateOf(int c) { return c & ~CAPACITY; } //runStateOf:獲取運行狀態;//~x=-(x+1) //默認值0 private static int workerCountOf(int c) { return c & CAPACITY; } //workerCountOf:獲取活動線程數; //默認值0,當線程數+1是值也會+1 private static int ctlOf(int rs, int wc) { return rs | wc; } //ctlOf:獲取運行狀態和活動線程數的值。//默認值-536870912 説明: ctl 是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它包含兩部分的信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount), 可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。 PS: 1.&和&&的區別 相同點: 最終得到的boolean值結果一樣,都是“並且and”的意思 不同點: &既是邏輯運算符也是位運算符;&&只是邏輯運算符 &不具有短路效果,即左邊false,右邊還會執行;&&具有短路效果,左邊為false,右邊則不執行 2.| 和 || 的區別 相同點: 最終得到的boolean值結果一樣,都是“或者or”的意思 不同點: | 既是邏輯運算符也是位運算符;|| 只是邏輯運算符 | 不具有短路效果,即左邊true,右邊還會執行;|| 具有短路效果,左邊為true,右邊則不執行
2)ThreadPoolExecutor類#execute方法【這裏涉及到一個概念, 提交優先級 : 核心線程>隊列>非核心線程】
展示
public void execute(Runnable command) { if (command == null) //不能提交空任務 throw new NullPointerException(); int c = ctl.get(); //獲取運行的線程數 //核心線程數不滿 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //在addWorker中創建工作線程執行任務 return; c = ctl.get(); } //線程還在運行,且核心數滿了,放入線程池隊列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//線程池是否處於運行狀態,如果不是,則剛塞入的任務要移除 reject(command); //走拒絕策略 //這一步其實沒有很大意義,除非出現線程池所有線程完蛋了,但是隊列還有任務的情況。(一般是進入時時運行態,然後遇到狀態變更的情況) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //插入隊列不成功,且當前線程數數量小於最大線程池數量,此時則創建新線程執行任務,創建失敗拋出異常 else if (!addWorker(command, false)) reject(command); //走拒絕策略 }
説明
在正常運行狀態下,線程池:核心線程執行任務-》塞入隊列-》非核心線程執行任務。
體現了在併發不激烈的情況下,儘量減少創建線程的操作,用已有的線程。而且核心線程數並不是提前創建的,而是用到的時候才會創建。而且核心線程數不滿,優先以創建線程來執行任務。
邏輯展示
3)ThreadPoolExecutor類#addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取線程池的狀態 int c = ctl.get(); int rs = runStateOf(c); //如果是非運行狀態(因為只有運行狀態是負數) if (rs >= SHUTDOWN && ! //判斷是不是關閉狀態,不接收新任務,但能處理已添加的任務 //任務是不是空任務,隊列是不是空(這一步説明了關閉狀態不接受任務) (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取活動線程數 int wc = workerCountOf(c); //檢驗線程數是否大於容量值【這是避免設置的非核心線程數沒有限制大小】 //根據傳入參數判斷核心線程數與非核心線程數是否達到了最大值 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //嘗試增加workerCount數量【也就是活躍線程數+1】,如果成功,則跳出第一個for循環 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失敗,則重新獲取ctl的值 c = ctl.get(); // 如果當前的運行狀態不等於rs,説明狀態已被改變,返回第一個for循環繼續執行 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; //線程啟動標誌 boolean workerAdded = false; //線程添加標誌 Worker w = null; try { //根據firstTask來創建Worker對象,每一個Worker對象都會創建一個線程 w = new Worker(firstTask); //【調用1】 final Thread t = w.thread; //如果過線程不為空,則試着將線程加入工作隊列中 if (t != null) { final ReentrantLock mainLock = this.mainLock; //加重入鎖 mainLock.lock(); try { // 重新獲取線程的狀態 int rs = runStateOf(ctl.get()); //是否線程池正處於運行狀態 if (rs < SHUTDOWN || //線程池是否處於關閉狀態 且 傳入的任務為空(説明關閉狀態還是能添加工作者,但是不允許添加任務) (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //判斷線程是否存活 throw new IllegalThreadStateException(); //workers是一個HashSet,將該worker對象添加其中 workers.add(w); //記錄線程工作者的值 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //修改添加標記 workerAdded = true; } } finally { //解鎖 mainLock.unlock(); } //如果添加成功,則啟動線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//【調用2】 } return workerStarted; } //調用1 Worker(Runnable firstTask) { setState(-1); // 創建時不允許中斷 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //調用2:添加工作者失敗方法 private void addWorkerFailed(Worker w) { //加重入鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //移除工作者 if (w != null) workers.remove(w); //任務數量減一 decrementWorkerCount(); //進入整理狀態() tryTerminate(); } finally { mainLock.unlock(); } }
説明
Worker繼承了AQS,使用AQS來實現獨佔鎖的功能。為什麼不使用ReentrantLock來實現呢?
可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:
1)lock方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中;
2)如果正在執行任務,則不應該中斷線程;
3)如果該線程現在不是獨佔鎖的狀態,也就是空閒的狀態,説明它沒有在處理任務,這時可以對該線程進行中斷;
4)線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閒的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閒狀態;
5)之所以設置為不可重入,是因為我們不希望任務在調用像setCorePoolSize這樣的線程池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果在任務中調用瞭如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。
所以,Worker繼承自AQS( AbstractQueuedSynchronizer類 ),用於判斷線程是否空閒以及是否可以被中斷。
此外,在構造方法中執行了setState(-1);,把state變量設置為-1,為什麼這麼做呢?是因為AQS中默認的state是0,如果剛創建了一個Worker對象,還沒有執行任務時,這時就不應該被中斷。tryAcquire方法是根據state是否是0來判斷的,所以,setState(-1);將state設置為-1是為了禁止在執行任務前對線程進行中斷。正因為如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置為0.
4)ThreadPoolExecutor類#runWorker方法【這裏有涉及到一個概念, 執行優先級 : 核心線程>非核心線程>隊列】
代碼展示
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //取出任務 w.firstTask = null; //將工作者持有任務清空 w.unlock(); //將線程置為可中斷,因為創建時候設置不可中斷 boolean completedAbruptly = true; // 是否因為異常退出循環 try { //當沒有任務的時候,優先從隊列裏面獲取(自旋方式) while (task != null || (task = getTask()) != null) { w.lock(); //如果線程池正在停止,那麼要保證當前線程是中斷狀態; if ((runStateAtLeast(ctl.get(), STOP) || // 如果不是的話,則要保證當前線程不是中斷狀態;(這裏要考慮在執行該if語句期間可能也執行了shutdownNow方法,shutdownNow方法會把狀態設置為STOP。STOP狀態要中斷線程池中的所有線程,而這裏使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,因為Thread.interrupted()方法會復位中斷的狀態。) (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //如果不是中斷狀態,則調用task.run()執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //completedAbruptly變量來表示在執行任務過程中是否出現了異常,在processWorkerExit方法中會對該變量的值進行判斷。 processWorkerExit(w, completedAbruptly); } }
彙總説明
總結一下runWorker方法的執行過程:
1)while循環不斷地通過getTask()方法獲取任務;
2)getTask()方法從阻塞隊列中取任務;
3)如果線程池正在停止,那麼要保證當前線程是中斷狀態,否則要保證當前線程不是中斷狀態;調用task.run()執行任務;
4)如果task為null則跳出循環,執行processWorkerExit()方法;
5)ThreadPoolExecutor類#getTask方法
代碼展示
private Runnable getTask() { // timeOut變量的值表示上次從阻塞隊列中取任務時是否超時 boolean timedOut = false; for (;;) { //獲取線程池狀態 int c = ctl.get(); int rs = runStateOf(c); //是否是非運行狀態(因為如果當前線程池狀態的值是SHUTDOWN或以上時,不允許再向阻塞隊列中添加任務。) //如果是非運行狀態:是不是STOP,TIDYING,TERMINATED ,三種狀態之一;或者隊列為空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //如果以上條件滿足,則將workerCount減1並返回null decrementWorkerCount(); //CAS線程數減一 return null; } //重新獲取線程的數量 int wc = workerCountOf(c); //allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時; //wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;對於超過核心線程數量的這些線程,需要進行超時控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//有非核心線程必定為true //wc > maximumPoolSize的情況是因為可能在此方法執行階段同時執行了setMaximumPoolSize方法; //timed && timedOut 如果為true,表示當前操作需要進行超時控制,並且上次從阻塞隊列中獲取任務發生了超時。 if ((wc > maximumPoolSize || (timed && timedOut)) //接下來判斷,如果有效線程數量大於1,或者阻塞隊列是空的,那麼嘗試將workerCount減1; //如果wc == 1時,也就説明當前線程是線程池中唯一的一個線程了。 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; //如果減1失敗,則返回重試。 continue; } try { //根據timed來判斷,如果為true(大概率是有非核心線程),則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則返回null; //否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; //判斷任務超時 } catch (InterruptedException retry) { // 如果獲取任務時當前線程發生了中斷,則設置timedOut為false並返回循環重試 timedOut = false; } } }
彙總説明
運行狀態下( 這種情況下會把超出核心線程數的部分進入回收,也有一定概率回收核心線程 ):
情況1:當有非核心線程數的時候,timed為true,導致調用poll方法,這時候如果沒有任務且超時,timedOut變為true,第二次進入自旋,timed還是true,進入判斷會走compareAndDecrementWorkerCount,線程數減一,並返回null。( 這種情況存在極端情況就是,全部線程走到同一邏輯去減,導致全部線程數都被減完了【即時有着wc > 1的判斷,因為多線程併發情況,你懂得】 )
情況2:沒有非核心線程數,timed為false,導致調用take方法,線程一致阻塞直至,拿到任務。(這時候不存在減少線程)
非運行狀態下( 這種情況下是線程都會進入回收 ):
情況3:如果線程狀態是STOP,TIDYING,TERMINATED,那麼調用decrementWorkerCount,線程數減一,返回null。
情況4:如果線程狀態是SHUTDOWN,隊列不為空,則繼續任務,如果隊列為空,那麼調用decrementWorkerCount,線程數減一,返回null。
所以,綜上所述, 非核心線程和核心線程其實都存在被回收的概率 。
6)ThreadPoolExecutor類#processWorkerExit方法
代碼展示
//主要用於線程的清理工作 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值為true,則説明線程執行時出現了異常,需要將workerCount減1; // 如果線程執行時沒有出現異常,説明在getTask()方法中已經已經對workerCount進行了減1操作,這裏就不必再減了。 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; //從workers中移除,也就表示着從線程池中移除了一個工作線程 workers.remove(w); } finally { mainLock.unlock(); } //根據線程池狀態進行判斷是否結束線程池 tryTerminate(); int c = ctl.get(); //當線程池是RUNNING或SHUTDOWN狀態時,如果worker是異常結束,那麼會直接addWorker;如果是其他三種,就不會去補Worker。 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果allowCoreThreadTimeOut=true(可設置),並且等待隊列有任務,至少保留一個worker; if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果allowCoreThreadTimeOut=false(默認值),workerCount不少於corePoolSize。【靠後面的addWorker】 if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
代碼説明
通過設置allowCoreThreadTimeOut參數,我們可以選擇核心線程的回收,在不用的時候保留一個worker。(這種更適用於某時間段高併發,其餘時間段工作量不足的情況)
7)ThreadPoolExecutor類#tryTerminate方法
final void tryTerminate() { for (;;) { int c = ctl.get(); /** * 當前線程池的狀態為以下幾種情況時,直接返回: * 1. RUNNING,因為還在運行中,不能停止; * 2. TIDYING或TERMINATED,因為線程池中已經沒有正在運行的線程了; * 3. SHUTDOWN並且等待隊列非空,這時要執行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果線程數量不為0,則中斷一個空閒的工作線程,並返回 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 這裏嘗試設置狀態為TIDYING,如果設置成功,則調用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 設置狀態為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
8)ThreadPoolExecutor類#shutdown方法
//shutdown方法要將線程池切換到SHUTDOWN狀態,並調用 interruptIdleWorkers方法請求中斷所有空閒的worker,最後調用tryTerminate嘗試結束線程池。 public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判斷 checkShutdownAccess(); // 切換狀態為SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閒線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 嘗試結束線程池 tryTerminate(); }
9)ThreadPoolExecutor類#interruptIdleWorkers方法
private void interruptIdleWorkers() { interruptIdleWorkers(false); } //interruptIdleWorkers遍歷workers中所有的工作線程,若線程沒有被中斷tryLock成功,就中斷該線程。 //為什麼需要持有mainLock?因為workers是HashSet類型的,不能保證線程安全。private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
10)ThreadPoolExecutor類#hutdownNow方法
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中斷所有工作線程,無論是否空閒 interruptWorkers(); // 取出隊列中沒有被執行的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
11)問題思考
1. 在runWorker方法中,執行任務時對Worker對象w進行了lock操作,為什麼要在執行任務的時候對每個工作線程都加鎖呢 ?
(1)在getTask方法中,如果這時線程池的狀態是SHUTDOWN並且workQueue為空,那麼就應該返回null來結束這個工作線程,而使線程池進入SHUTDOWN狀態需要調用shutdown方法;
(2)shutdown方法會調用interruptIdleWorkers來中斷空閒的線程,interruptIdleWorkers持有mainLock,會遍歷workers來逐個判斷工作線程是否空閒。但getTask方法中沒有mainLock;
(3)在getTask中,如果判斷當前線程池狀態是RUNNING,並且阻塞隊列為空,那麼會調用workQueue.take()進行阻塞;
(4)如果在判斷當前線程池狀態是RUNNING後,這時調用了shutdown方法把狀態改為了SHUTDOWN,這時如果不進行中斷,那麼當前的工作線程在調用了workQueue.take()後會一直阻塞而不會被銷燬,因為在SHUTDOWN狀態下不允許再有新的任務添加到workQueue中,這樣一來線程池永遠都關閉不了;
(5)由上可知,shutdown方法與getTask方法(從隊列中獲取任務時)存在競態條件;
(6)解決這一問題就需要用到線程的中斷,也就是為什麼要用interruptIdleWorkers方法。在調用workQueue.take()時,如果發現當前線程在執行之前或者執行期間是中斷狀態,則會拋出InterruptedException,解除阻塞的狀態;
(7)但是要中斷工作線程,還要判斷工作線程是否是空閒的,如果工作線程正在處理任務,就不應該發生中斷;
(8)所以Worker繼承自AQS,在工作線程處理任務時會進行lock,interruptIdleWorkers在進行中斷時會使用tryLock來判斷該工作線程是否正在處理任務,如果tryLock返回true,説明該工作線程當前未執行任務,這時才可以被中斷。
【6】額外拓展
(1)有關阻塞隊列部分(可查看 java原生阻塞隊列詳解索引 )
(2)有關 Future和Callable的部分(可查看針對Future部分的詳解)
tryTerminate
- 線程池底層原理詳解與源碼分析
- 30分鐘掌握 Webpack
- 線性迴歸大結局(嶺(Ridge)、 Lasso迴歸原理、公式推導),你想要的這裏都有
- 【前端必會】webpack loader 到底是什麼
- 中心化決議管理——雲端分析
- HashMap底層原理及jdk1.8源碼解讀
- 詳解JS中 call 方法的實現
- 打印 Logger 日誌時,需不需要再封裝一下工具類?
- 初識設計模式 - 代理模式
- 密碼學奇妙之旅、01 CFB密文反饋模式、AES標準、Golang代碼
- Springboot之 Mybatis 多數據源實現
- CAS核心思想、底層實現
- 面試突擊86:SpringBoot 事務不回滾?怎麼解決?
- 基於electron vue element構建項目模板之【打包篇】
- MiniWord .NET Word模板引擎,藉由Word模板和數據簡單、快速生成文件。
- 認識線程,初始併發
- 1-VSCode搭建GD32開發環境
- 初識設計模式 - 原型模式
- 線程安全問題的產生條件、解決方式
- 2>&1到底是什麼意思?