讀完 RocketMQ 源碼,我學會了如何優雅的創建線程

RocketMQ 是一款開源的分佈式消息系統,基於高可用分佈式集羣技術,提供低延時、高可靠的消息發佈與訂閲服務。這篇文章,筆者整理了 RocketMQ 源碼中創建線程的幾點技巧,希望大家讀完之後,能夠有所收穫。

1 創建單線程首先我們先温習下常用的創建單線程的兩種方式:實現 Runnable 接口繼承 Thread 類 ▍一、實現 Runnable 接口

圖中,MyRunnable 類實現了 Runnable 接口的 run 方法,run 方法中定義具體的任務代碼或處理邏輯,而Runnable 對象是作為線程構造函數的參數。▍二、 繼承 Thread 類

線程實現類直接繼承 Thread ,本質上也是實現 Runnable 接口的 run 方法。2 單線程抽象類創建單線程的兩種方式都很簡單,但每次創建線程代碼顯得有點宂餘,於是 RocketMQ 裏實現了一個抽象類 ServiceThread 。

我們可以看到抽象類中包含了如下核心方法:定義線程名;啟動線程;關閉線程。下圖展示了 RocketMQ 眾多的單線程實現類。

實現類的編程模版類似 :

我們僅僅需要繼承抽象類,並實現 getServiceName 和 run 方法即可。啟動的時候,調用 start 方法 , 關閉的時候調用 shutdown 方法。3 線程池原理線程池是一種基於池化思想管理線程的工具,線程池維護着多個線程,等待着監督管理者分配可併發執行的任務。這避免了在處理短時間任務時創建與銷燬線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。 JDK中提供的 ThreadPoolExecutor 類,是我們最常使用的線程池類。

參數名作用corePoolSize隊列沒滿時,線程最大併發數maximumPoolSizes隊列滿後線程能夠達到的最大併發數keepAliveTime空閒線程過多久被回收的時間限制unitkeepAliveTime 的時間單位workQueue阻塞的隊列類型threadPoolFactory改變線程的名稱、線程組、優先級、守護進程狀態RejectedExecutionHandler超出 maximumPoolSizes + workQueue 時,任務會交給RejectedExecutionHandler來處理

任務的調度通過執行 execute方法完成,方法的核心流程如下:如果 workerCount < corePoolSize,創建並啟動一個線程來執行新提交的任務。如果 workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中。如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創建並啟動一個線程來執行新提交的任務。如果 workerCount >= maximumPoolSize,並且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

4 線程池封裝在 RocketMQ 裏 ,網絡請求都會攜帶命令編碼,每種命令映射對應的處理器,而處理器又會註冊對應的線程池。

當服務端 Broker 接收到發送消息命令時,都會有單獨的線程池 sendMessageExecutor 來處理這種命令請求。

基於 ThreadPoolExecutor 做了一個簡單的封裝 ,BrokerFixedThreadPoolExecutor 構造函數包含六個核心參數:核心線程數和最大線程數相同 ,數量是:cpu核數和4比較後的最小值;空閒線程的回收的時間限制,默認1分鐘;發送消息隊列,有界隊列,默認10000;線程工廠 ThreadFactoryImpl ,定義了線程名前綴:SendMessageThread_ 。RocketMQ 實現了一個簡單的線程工廠:ThreadFactoryImpl,線程工廠可以定義線程名稱,以及是否是守護線程 。

開源項目 Cobar ,Xmemcached,Metamorphosis 中都有類似線程工廠的實現 。5 線程名很重要 線程名很重要,線程名很重要,線程名很重要 ,重要的事情説三遍。我們看到 RocketMQ 中,無論是單線程抽象類還是多線程的封裝都會配置線程名 ,因為通過線程名,非常容易定位問題,從而大大提升解決問題的效率。 定位的媒介常見有兩種:日誌文件和堆棧記錄。▍一、日誌文件經常處理業務問題的同學,一定都經常與日誌打交道。

查看 ERROR 日誌,追溯到執行線程, 要是線程池隔離做的好,基本可以判斷出哪種業務場景出了問題;通過查看線程打印的日誌,推斷線程調度是否正常,比如有的定時任務線程打印了開始,沒有打印結束,推論當前線程可能已經掛掉或者阻塞。▍二、堆棧記錄jstack 是 java 虛擬機自帶的一種堆棧跟蹤工具 ,主要用來查看 Java 線程的調用堆棧,線程快照包含當前 java 虛擬機內每一條線程正在執行的方法堆棧的集合,可以用來分析線程問題。 jstack -l 進程pid

筆者查看線程堆棧,一般關注如下幾點:當前 jvm 進程中的線程數量和線程分類是否在預期的範圍內;系統接口超時或者定時任務停止的異常場景下 ,分析堆棧中是否有鎖未釋放,或者線程一直等待網絡通訊響應;分析 jvm 進程中哪個線程佔用的 CPU 最高。6 總結本文是RocketMQ 系列文章的開篇,和朋友們簡單聊聊 RocketMQ 源碼裏創建線程的技巧。單線程抽象類 ServiceThread 使用者只需要實現業務邏輯以及定義線程名即可 ,不需要寫宂餘的代碼。線程池封裝適當封裝,定義線程工廠,併合理配置線程池參數。線程名很重要文件日誌,堆棧記錄配合線程名能大大提升解決問題的效率。 RocketMQ 的多線程編程技巧很多,比如線程通訊,併發控制,線程模型等等,後續的文章會一一為大家展現。如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,你的支持會激勵我輸出更高質量的文章,非常感謝!

- 天翼雲全場景業務無縫替換至國產原生操作系統CTyunOS!
- 以羊了個羊為例,淺談小程序抓包與響應報文修改
- 這幾種常見的 JVM 調優場景,你知道嗎?
- 如此狂妄,自稱高性能隊列的Disruptor有啥來頭?
- 為什麼要學習GoF設計模式?
- 827. 最大人工島 : 簡單「並查集 枚舉」運用題
- 手把手教你如何使用 Timestream 實現物聯網時序數據存儲和分析
- 850. 矩形面積 II : 掃描線模板題
- Java 併發編程解析 | 基於JDK源碼解析Java領域中的併發鎖,我們可以從中學習到什麼內容?
- 【手把手】光説不練假把式,這篇全鏈路壓測實踐探索
- 大廠鍾愛的全鏈路壓測有什麼意義?四種壓測方案詳細對比分析
- 寫個續集,填坑來了!關於“Thread.sleep(0)這一行‘看似無用’的代碼”裏面留下的坑。
- 857. 僱傭 K 名工人的最低成本 : 枚舉 優先隊列(堆)運用題
- Vue3 實現一個自定義toast(小彈窗)
- 669. 修剪二叉搜索樹 : 常規樹的遍歷與二叉樹性質
- 讀完 RocketMQ 源碼,我學會了如何優雅的創建線程
- 性能調優——小小的log大大的坑
- 1582. 二進制矩陣中的特殊位置 : 簡單模擬題
- elementui源碼學習之仿寫一個el-switch
- 646. 最長數對鏈 : 常規貪心 DP 運用題