學到就是賺到,面試加分項之WebServer執行緒池管理!

語言: CN / TW / HK

大家好,這篇文章我們來討論一個話題,怎麼去管理 SpringBoot 內建的三大 WebServer(Tomcat、Jetty、Undertow)的執行緒池,包括監控告警、動態調參。

不管是應對越來越卷的面試考察,還是自己專案日常效能調優,絕對有用,學到就是賺到,搬好椅子,開始我們的分析之旅。


寫在前面

要想去管理第三方元件的執行緒池,首先肯定要對這些元件有一定的熟悉度,瞭解整個請求的一個處理過程,找到對應處理請求的執行緒池,這些執行緒池不一定是 JUC 包下的 ThreadPoolExecutor 類,也可能是元件自己實現的執行緒池,但是基本原理都差不多。

Tomcat、Jetty、Undertow 這三個 WebServer 都是這樣,他們並沒有直接使用 JUC 提供的執行緒池實現,而是自己實現了一套,或者擴充套件了 JUC 的實現。翻原始碼找到相應的目標執行緒池後,然後看有沒有暴露 public 方法供我們呼叫獲取,如果沒有就需要考慮通過反射來獲取了。

下面我們來簡單分析下這三大 WebServer 的執行緒池內部實現。


Tomcat 內部執行緒池的實現

  • Tomcat 內部執行緒池沒有直接使用 JUC 下的 ThreadPoolExecutor,而是選擇繼承 JUC 下的 Executor 體系類,然後重寫 execute() 等方法,不同版本有差異。

1.繼承 JUC 原生 ThreadPoolExecutor(9.0.50 版本及以下),並覆寫了一些方法,主要 execute() 和 afterExecute()

2.繼承 JUC 的 AbstractExecutorService(9.0.51 版本及以上),程式碼基本是拷貝 JUC 的 ThreadPoolExecutor,也相應的微調了 execute() 方法執行流程

注意 Tomcat 實現的執行緒池類名稱也叫 ThreadPoolExecutor,名字跟 JUC 下的是一樣的,Tomcat 的 ThreadPoolExecutor 類 execute() 方法如下:

public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

可以看出他是先呼叫父類的 execute()方法,然後捕獲 RejectedExecutionException 異常,再去判斷如果任務佇列型別是 TaskQueue,則嘗試將任務新增到任務佇列中,如果新增失敗,證明佇列已滿,然後再執行拒絕策略,此處 submittedCount 是一個原子變數,記錄提交到此執行緒池但未執行完成的任務數(主要在下面要提到的 TaskQueue 佇列的 offer()方法用),為什麼要這樣設計呢?繼續往下看!

  • Tomcat 定義了阻塞佇列 TaskQueue 繼承自 LinkedBlockingQueue,該佇列主要重寫了 offer()方法。
 @Override
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

可以看到他在入隊之前做了幾個判斷,這裡的 parent 就是所屬的執行緒池物件

1.如果 parent 為 null,直接呼叫父類 offer 方法入隊

2.如果當前執行緒數等於最大執行緒數,則直接呼叫父類 offer()方法入隊

3.如果當前未執行的任務數量小於等於當前執行緒數,仔細思考下,是不是說明有空閒的執行緒呢,那麼直接呼叫父類 offer()入隊後就馬上有執行緒去執行它

4.如果當前執行緒數小於最大執行緒數量,則直接返回 false,然後回到 JUC 執行緒池的執行流程回想下,是不是就去新增新執行緒去執行任務了呢

5.其他情況都直接入隊

  • 因為 Tomcat 執行緒池主要是來做 IO 任務的,做這一切的目的主要也是為了以最小代價的改動更好的支援 IO 密集型的場景,JUC 自帶的執行緒池主要是適合於 CPU 密集型的場景,可以回想一下 JUC 原生執行緒池 ThreadPoolExecutor#execute()方法的執行流程

1.判斷如果當前執行緒數小於核心執行緒池,則新建一個執行緒來處理提交的任務

2.如果當前執行緒數大於核心執行緒數且佇列沒滿,則將任務放入任務佇列等待執行

3.如果當前當前執行緒池數大於核心執行緒池,小於最大執行緒數,且任務佇列已滿,則建立新的執行緒執行提交的任務

4.如果當前執行緒數等於最大執行緒數,且佇列已滿,則拒絕該任務

可以看出噹噹前執行緒數大於核心執行緒數時,JUC 原生執行緒池首先是把任務放到佇列裡等待執行,而不是先建立執行緒執行。

如果 Tomcat 接收的請求數量大於核心執行緒數,請求就會被放到佇列中,等待核心執行緒處理,這樣會降低請求的總體處理速度,所以 Tomcat 並沒有使用 JUC 原生執行緒池,利用 TaskQueue 的 offer()方法巧妙的修改了 JUC 執行緒池的執行流程,改寫後 Tomcat 執行緒池執行流程如下:

1.判斷如果當前執行緒數小於核心執行緒池,則新建一個執行緒來處理提交的任務

2.如果當前當前執行緒池數大於核心執行緒池,小於最大執行緒數,則建立新的執行緒執行提交的任務

3.如果當前執行緒數等於最大執行緒數,則將任務放入任務佇列等待執行

4.如果佇列已滿,則執行拒絕策略

  • Tomcat 核心執行緒池有對應的獲取方法,獲取方式如下
    public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
        TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer;
        return new ExecutorWrapper(POOL_NAME,
                tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor());
    }
  • 想要動態調整 Tomcat 執行緒池的執行緒引數,可以在引入相應依賴後,在配置檔案中新增以下配置就行,配置檔案完整示例看官網
spring:
  dynamic:
    tp:
      // 省略其他配置項
      tomcatTp:    # tomcat webserver執行緒池配置
        corePoolSize: 100
        maximumPoolSize: 200
        keepAliveTime: 60

Tomcat 執行緒池就介紹到這裡吧,通過以上的一些介紹想必大家對 Tomcat 執行緒池執行任務的流程應該比較清楚了吧。


Jetty 內部執行緒池的實現

  • Jetty 內部執行緒池,定義了一個繼承自 Executor 的 ThreadPool 頂級介面,實現類有以下幾個

  • 內部主要使用 QueuedThreadPool 這個實現類,該執行緒池執行流程就不在詳細解讀了,感興趣的可以自己去看原始碼,核心思想都差不多,圍繞核心執行緒數、最大執行緒數、任務佇列三個引數入手,跟 Tocmat 比對著來看,其實也挺簡單的。
public void execute(Runnable job)
    {
        // Determine if we need to start a thread, use and idle thread or just queue this job
        int startThread;
        while (true)
        {
            // Get the atomic counts
            long counts = _counts.get();

            // Get the number of threads started (might not yet be running)
            int threads = AtomicBiInteger.getHi(counts);
            if (threads == Integer.MIN_VALUE)
                throw new RejectedExecutionException(job.toString());

            // Get the number of truly idle threads. This count is reduced by the
            // job queue size so that any threads that are idle but are about to take
            // a job from the queue are not counted.
            int idle = AtomicBiInteger.getLo(counts);

            // Start a thread if we have insufficient idle threads to meet demand
            // and we are not at max threads.
            startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;

            // The job will be run by an idle thread when available
            if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
                continue;

            break;
        }

        if (!_jobs.offer(job))
        {
            // reverse our changes to _counts.
            if (addCounts(-startThread, 1 - startThread))
                LOG.warn("{} rejected {}", this, job);
            throw new RejectedExecutionException(job.toString());
        }

        if (LOG.isDebugEnabled())
            LOG.debug("queue {} startThread={}", job, startThread);

        // Start a thread if one was needed
        while (startThread-- > 0)
            startThread();
    }
  • Jetty 執行緒池有提供 public 的獲取方法,獲取方式如下
    public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
        JettyWebServer jettyWebServer = (JettyWebServer) webServer;
        return new ExecutorWrapper(POOL_NAME, jettyWebServer.getServer().getThreadPool());
    }
  • 想要動態調整 Jetty 執行緒池的執行緒引數,可以在引入 DynamicTp 依賴後,在配置檔案中新增以下配置就行,配置檔案完整示例官網介紹
spring:
  dynamic:
    tp:
      // 省略其他配置項
      jettyTp:    # jetty weberver執行緒池配置
        corePoolSize: 100
        maximumPoolSize: 200

Undertow 內部執行緒池的實現

  • Undertow 因為其效能彪悍,輕量,現在用的還是挺多的,wildfly(前身 Jboss)從 8 開始內部預設的 WebServer 用 Undertow 了,之前是 Tomcat 吧。瞭解 Undertow 的小夥伴應該知道,他底層是基於 XNIO 框架(3.X 之前)來做的,這也是 Jboss 開發的一款基於 java nio 的優秀網路框架。但 Undertow 宣佈從 3.0 開始底層網路框架要切換成 Netty 了,官方給的原因是說起網路程式設計,Netty 已經是事實上標準,用 Netty 的好處遠大於 XNIO 能提供的,所以讓我們期待 3.0 的釋出吧,只可惜三年前就宣佈了,至今也沒動靜,不知道是夭折了還是咋的,說實話,改動也挺大的,看啥時候釋出吧,以下的介紹是基於 Undertow 2.x 版本來的

  • Undertow 內部是定義了一個叫 TaskPool 的執行緒池頂級介面,該介面有如圖所示的幾個實現。其實這幾個實現類都是採用組合裝飾的方式,內部都維護一個 JUC 的 Executor 體系類或者維護 Jboss 提供的 EnhancedQueueExecutor 類(也繼承 JUC ExecutorService 類),執行流程可以自己去分析。

  • 具體的建立程式碼如下,根據外部是否傳入,如果有傳入則用外部傳入的類,如果沒有,根據引數設定內部建立一個,具體是用 JUC 的 ThreadPoolExecutor 還是 Jboss 的 EnhancedQueueExecutor,根據配置引數選擇

  • Undertow 執行緒池沒有提供 public 的獲取方法,所以通過反射來獲取,獲取方式如下
    public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {

        UndertowWebServer undertowWebServer = (UndertowWebServer) webServer;
        val undertow = (Undertow) ReflectionUtil.getFieldValue(UndertowWebServer.class, "undertow", undertowWebServer);
        if (Objects.isNull(undertow)) {
            return null;
        }
        return new ExecutorWrapper(POOL_NAME, undertow.getWorker());
    }
  • 想要動態調整 Undertow 執行緒池的執行緒引數,可以在引入 DynamicTp 依賴後,在配置檔案中新增以下配置就行,配置檔案完整示例看官網
spring:
  dynamic:
    tp:
      // 省略其他配置項
      undertowTp:      # undertow webserver執行緒池配置
        corePoolSize: 100
        maximumPoolSize: 200
        keepAliveTime: 60

總結

以上介紹了 Tomcat、Jetty、Undertow 三大 WebServer 內建執行緒池的一些情況,重點介紹了 Tomcat 的,篇幅有限,其他兩個感興趣可以自己分析,原理都差不多。同時也介紹了基於 DynamicTp 怎麼動態調整執行緒池的引數,當我們做 WebServer 效能調優時,能動態調整引數真的是非常好用的。

再次歡迎大家使用 DynamicTp 框架,一起完善專案。


關於 DynamicTp 框架

DynamicTp 是一個基於配置中心實現的輕量級動態執行緒池管理工具,主要功能可以總結為 動態調參、通知報警、執行監控、三方包執行緒池管理等幾大類。

經過幾個版本迭代,目前最新版本 v1.0.7 具有以下特性

特性

  • 程式碼零侵入:所有配置都放在配置中心,對業務程式碼零侵入

  • 輕量簡單:基於 springboot 實現,引入 starter,接入只需簡單 4 步就可完成,順利 3 分鐘搞定

  • 高可擴充套件:框架核心功能都提供 SPI 介面供使用者自定義個性化實現(配置中心、配置檔案解析、通知告警、監控資料採集、任務包裝等等)

  • 線上大規模應用:參考美團執行緒池實踐,美團內部已經有該理論成熟的應用經驗

  • 多平臺通知報警:提供多種報警維度(配置變更通知、活性報警、容量閾值報警、拒絕觸發報警、任務執行或等待超時報警),已支援企業微信、釘釘、飛書報警,同時提供 SPI 介面可自定義擴充套件實現

  • 監控:定時採集執行緒池指標資料,支援通過 MicroMeter、JsonLog 日誌輸出、Endpoint 三種方式,可通過 SPI 介面自定義擴充套件實現

  • 任務增強:提供任務包裝功能,實現 TaskWrapper 介面即可,如 TtlTaskWrapper 可以支援執行緒池上下文資訊傳遞,以及給任務設定標識 id,方便問題追蹤

  • 相容性:JUC 普通執行緒池也可以被框架監控,@Bean 定義時加 @DynamicTp 註解即可

  • 可靠性:框架提供的執行緒池實現 Spring 生命週期方法,可以在 Spring 容器關閉前儘可能多的處理佇列中的任務

  • 多模式:參考 Tomcat 執行緒池提供了 IO 密集型場景使用的 EagerDtpExecutor 執行緒池

  • 支援多配置中心:基於主流配置中心實現執行緒池引數動態調整,實時生效,已支援 Nacos、Apollo、Zookeeper、Consul,同時也提供 SPI 介面可自定義擴充套件實現

  • 中介軟體執行緒池管理:整合管理常用第三方元件的執行緒池,已整合 Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix 等元件的執行緒池管理(調參、監控報警)


專案地址

目前累計 1.6k star,感謝你的 star,歡迎 pr,業務之餘一起給開源貢獻一份力量

官網https://dynamictp.cn

gitee 地址https://gitee.com/dromara/dynamic-tp

github 地址https://github.com/dromara/dynamic-tp


加入社群

看到這兒,請給專案一個star,你的支援是我們前進的動力!

使用過程中有任何問題,或者對專案有什麼想法或者建議,可以加入社群,跟群友一起交流討論。

微信群已滿200人,可以關注微信公眾號,加我個人微信拉群(備註:dynamic-tp)。