線程池的線程是如何複用的
開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第7天,點擊查看活動詳情
前言
進程和線程的關係相信大家都知道,這裏我就不做過多的解釋了,既然一個進程是由多個線程組成的,那麼線程池又是由若干個線程隊列組成的,在併發量比較高的情景下,我們通常會去創建線程池就執行任務,而不單一的創建多個線程去執行任務,因為線程的創建的一系列動作,是需要資源開銷的,如果頻繁的對線程創建銷燬,其實本身是一種很浪費資源的,就更談不上提高效率了。
一般都會創建線程池將線程統一管理,並且還會引入阻塞和非阻塞隊列,接收需要排隊處理的任務,但是線程池裏的線程是在處理完任務就會進行銷燬麼,其實並不是這樣的,下面我們一起來對線程池裏線程是如何複用的進行分析。
使用線程池
使用線程池原因
1.複用已創建的線程,減少線程創建、銷燬開銷。 2.可以根據自身系統的承載能力,合理對線程池線程數量進行控制。 3.控制併發數,保護系統。
```
private static void creatThreadPool() throws InterruptedException {
List
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread" + i);
thread.start();
threadList.add(thread);
TimeUnit.MILLISECONDS.sleep(1);
}
long end = System.currentTimeMillis();
long needTime = end - start;
log.info("創建100個線程所花費的時間:" + needTime + "ms");
}
public static void main(String[] args) throws InterruptedException { creatThreadPool(); }
```
創建100個線程需要264ms,平均一個線程的創建需要2.2ms左右,線程執行任務可以只需要不到1ms,那麼這樣看來創建線程是不划算的。
這裏除裏JDK自帶的四種線程池類型,簡單介紹下jdk自帶四種線程池。
1.newCachedThreadPool:可緩存的的無界線程池,可以自動線程回收,通常執行短期異步的任務。
2.newFixThreadPool:固定數量的線程池,控制併發數。
3.newSingleThreadPool:單線程工作的線程池,所有任務按照FIFO先進先出的原則執行。
4.newScheduleThreadPool:可定期執行的線程池,可指定執行時間和執行次數。
通常情況下,在阿里的開發手冊上寫不推薦使用Executors創建線程,也就是線程池的頂級接口,jdk自帶的線程池創建的時候是沒有核心線程數的,不斷的創建對象,那麼就會存在內存溢出的風險。
線程池的工作流程
一般創建線程池還是使用ThreadPoolExecutor創建,它的上接口是ExecutorService,所有説真正創建線程池是用ExecutorService創建。 7大核心參數這裏就不多説了,直接説線程池的工作流程。
1.首先當運行的線程池<corePoolSize(核心線程數),就會創建線程執行這個任務
2.線程池線程數> corePoolSize(核心線程數),任務放入隊列。
3.隊列已滿,當前運行的線程數
4.當前線程不需要執行任務,也不能讓它一直存在着佔用資源,超出keepAliveTime,運行線程數> corePoolSize,這線程會被回收掉,這樣做主要是控制核心線程裏線程數量。
線程複用
首先看ThreadPoolExecutor源碼,execute線程池執行入口
``` public void execute(Runnable command) { if (command == null) throw new NullPointerException();
//當前線程數小於核心線程數
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//加入等待隊列裏排隊處理
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//檢查工作線程停止工作是否需要移除,觸發拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
//二次檢查
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//無法提交線程則觸發拒絕策略
else if (!addWorker(command, false))
reject(command);
}
```
這裏看到每個if判斷裏都存在addWorker方法,那麼這個方法肯定是線程是否複用的重點,
``` Worker w = null; try {
//將任務放到Worker工作線程裏面,
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//hashset 集合裏存放 Worker對象
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
```
Worker是個final修飾的內部類,意味着不能被其他類繼承,那麼線程複用只能在這一個類裏面進行,接着看Worker的run方法裏面執行的runWorker方法,這個是線程複用的核心方法。
``` final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //獲取線程裏面執行的任務 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果任務不為空 || 重新拿取線程裏的任務 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //判斷線程的狀態,並執行對應的拒絕策略 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }
```
getTask方法重新拿取線程裏的任務, 前面一系列的判斷主要是來檢查線程的狀態,以及線程池線程的數量,其核心主要是線程數量是否超過了核心線程數,如果超過了則會進入workQueue工作隊列,workQueue.poll非核心線程會一直去工作隊列裏獲取任務,非核心線程已經滿了,則會workQueue.take()核心線程去獲取任務,前面的runWorker方法是有while循環的,這樣就會一直執行下去,循環拿取任務,如果這個時候工作隊列裏面沒有隊列,超過keepAliveTime線程存活時間還沒有拿到任務,則會對對應線程進行銷燬。
``` private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
```
總結
在日常開發中,對線程池的優化也是比較重要的,如果線程池的核心線程數和最大線程數都不是隨意定義的,還是要結合本身服務器cpu的情況,以及阻塞隊列的使用,在一定情況下能緩解線程的壓力,本身阻塞隊列是帶有阻塞和喚醒的功能,阻塞隊列的長度也是需要根據實際開大的業務場景去定義的,最後運用好線程池,在處理高併發的業務場景下還是尤為關鍵的一項技術。