線程池的線程是如何複用的

語言: CN / TW / HK

開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第7天,點擊查看活動詳情

前言

進程和線程的關係相信大家都知道,這裏我就不做過多的解釋了,既然一個進程是由多個線程組成的,那麼線程池又是由若干個線程隊列組成的,在併發量比較高的情景下,我們通常會去創建線程池就執行任務,而不單一的創建多個線程去執行任務,因為線程的創建的一系列動作,是需要資源開銷的,如果頻繁的對線程創建銷燬,其實本身是一種很浪費資源的,就更談不上提高效率了。

一般都會創建線程池將線程統一管理,並且還會引入阻塞和非阻塞隊列,接收需要排隊處理的任務,但是線程池裏的線程是在處理完任務就會進行銷燬麼,其實並不是這樣的,下面我們一起來對線程池裏線程是如何複用的進行分析。

使用線程池

使用線程池原因

1.複用已創建的線程,減少線程創建、銷燬開銷。 2.可以根據自身系統的承載能力,合理對線程池線程數量進行控制。 3.控制併發數,保護系統。

``` private static void creatThreadPool() throws InterruptedException { List threadList = new ArrayList<>(); long start = System.currentTimeMillis(); log.info("創建線程池開始"); for (int i = 0; i < 100; i++) { Thread thread = new Thread(() -> { try { TimeUnit.SECONDS.sleep(10);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "thread" + i);
    thread.start();
    threadList.add(thread);
    TimeUnit.MILLISECONDS.sleep(1);
}
long end = System.currentTimeMillis();
long needTime = end - start;
log.info("創建100個線程所花費的時間:" + needTime + "ms");

}

public static void main(String[] args) throws InterruptedException { creatThreadPool(); }

```

屏幕快照 2022-12-18 上午11.01.58.png 創建100個線程需要264ms,平均一個線程的創建需要2.2ms左右,線程執行任務可以只需要不到1ms,那麼這樣看來創建線程是不划算的。

這裏除裏JDK自帶的四種線程池類型,簡單介紹下jdk自帶四種線程池。

1.newCachedThreadPool:可緩存的的無界線程池,可以自動線程回收,通常執行短期異步的任務。

2.newFixThreadPool:固定數量的線程池,控制併發數。

3.newSingleThreadPool:單線程工作的線程池,所有任務按照FIFO先進先出的原則執行。

4.newScheduleThreadPool:可定期執行的線程池,可指定執行時間和執行次數。

通常情況下,在阿里的開發手冊上寫不推薦使用Executors創建線程,也就是線程池的頂級接口,jdk自帶的線程池創建的時候是沒有核心線程數的,不斷的創建對象,那麼就會存在內存溢出的風險。

線程池的工作流程

一般創建線程池還是使用ThreadPoolExecutor創建,它的上接口是ExecutorService,所有説真正創建線程池是用ExecutorService創建。 7大核心參數這裏就不多説了,直接説線程池的工作流程。

1.首先當運行的線程池<corePoolSize(核心線程數),就會創建線程執行這個任務

2.線程池線程數> corePoolSize(核心線程數),任務放入隊列。

3.隊列已滿,當前運行的線程數 MaxImumPoolSize(最大線程數),使用Handler拒絕策略,當然不能丟棄任務,一般使用CallerRunsPolicy使用調用線程執行任務。

4.當前線程不需要執行任務,也不能讓它一直存在着佔用資源,超出keepAliveTime,運行線程數> corePoolSize,這線程會被回收掉,這樣做主要是控制核心線程裏線程數量。

線程複用

首先看ThreadPoolExecutor源碼,execute線程池執行入口

``` public void execute(Runnable command) { if (command == null) throw new NullPointerException();

 //當前線程數小於核心線程數
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
//加入等待隊列裏排隊處理
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get(); 
 //檢查工作線程停止工作是否需要移除,觸發拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
 //二次檢查
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
 //無法提交線程則觸發拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

```

這裏看到每個if判斷裏都存在addWorker方法,那麼這個方法肯定是線程是否複用的重點,

``` Worker w = null; try {

//將任務放到Worker工作線程裏面,
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int rs = runStateOf(ctl.get());

        if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
            if (t.isAlive()) // precheck that t is startable
                throw new IllegalThreadStateException();
    //hashset 集合裏存放 Worker對象
            workers.add(w);
            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
            workerAdded = true;
        }
    } finally {
        mainLock.unlock();
    }
    if (workerAdded) {
        t.start();
        workerStarted = true;
    }
}

} finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;

```

Worker是個final修飾的內部類,意味着不能被其他類繼承,那麼線程複用只能在這一個類裏面進行,接着看Worker的run方法裏面執行的runWorker方法,這個是線程複用的核心方法。

``` final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //獲取線程裏面執行的任務 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果任務不為空 || 重新拿取線程裏的任務 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //判斷線程的狀態,並執行對應的拒絕策略 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }

```

getTask方法重新拿取線程裏的任務, 前面一系列的判斷主要是來檢查線程的狀態,以及線程池線程的數量,其核心主要是線程數量是否超過了核心線程數,如果超過了則會進入workQueue工作隊列,workQueue.poll非核心線程會一直去工作隊列裏獲取任務,非核心線程已經滿了,則會workQueue.take()核心線程去獲取任務,前面的runWorker方法是有while循環的,這樣就會一直執行下去,循環拿取任務,如果這個時候工作隊列裏面沒有隊列,超過keepAliveTime線程存活時間還沒有拿到任務,則會對對應線程進行銷燬。

``` private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?

for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

    int wc = workerCountOf(c);

    // Are workers subject to culling?
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
        if (compareAndDecrementWorkerCount(c))
            return null;
        continue;
    }

    try {
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
}

}

```

總結

在日常開發中,對線程池的優化也是比較重要的,如果線程池的核心線程數和最大線程數都不是隨意定義的,還是要結合本身服務器cpu的情況,以及阻塞隊列的使用,在一定情況下能緩解線程的壓力,本身阻塞隊列是帶有阻塞和喚醒的功能,阻塞隊列的長度也是需要根據實際開大的業務場景去定義的,最後運用好線程池,在處理高併發的業務場景下還是尤為關鍵的一項技術。