Java 技術棧中間件優雅停機方案設計與實現全景圖

語言: CN / TW / HK

本系列 Netty 源碼解析文章基於 4.1.56.Final 版本

本文概要

在上篇文章 我為 Netty 貢獻源碼 | 且看 Netty 如何應對 TCP 連接的正常關閉,異常關閉,半關閉場景 中筆者為大家詳細介紹了 Netty 在處理連接關閉時的完整過程,並詳細介紹了 Netty 如何應對 TCP 連接在關閉時會遇到的各種場景。

在連接關閉之後,接下來就輪到 Netty 的謝幕時刻了,本文筆者會為大家詳盡 Java 技術棧中間件中關於優雅停機方案的詳細設計和實現。

筆者會從日常開發工作中常見的版本發佈,服務上下線的場景聊起,引出服務優雅啟停的需求,並從這個需求出發,一步一步帶大家探究各個中間件裏的優雅停機的相關設計。

熟悉筆者文風的讀者朋友應該知道,筆者肯定不會只是簡單的介紹,要麼不講,要講就要把整個技術體系的前世今生給大家講清楚,講明白。

基於這目的,筆者會先從支持優雅停機的底層技術基石--內核中的信號量開始聊起。

image.png

從內核層我們接着會聊到 JVM 層,在 JVM 層一探優雅停機底層的技術玄機。

image.png

隨後我們會從 JVM 層一路奔襲到 Spring 然後到 Dubbo。在這個過程中,筆者還會帶大家一起 Shooting Dubbo 在優雅停機下的一個 Bug,併為大家詳細介紹修復過程。

image.png

最後由 Dubbo 層的優雅停機,引出我們的主角--Netty 優雅停機的設計與實現:

Reactor優雅關閉總流程.png

下面我們來正式開始本文的內容~~

本文概要.png

1. Java 進程的優雅啟停

在我們的日常開發工作中,業務需求的迭代和優化伴隨圍繞着我們整個開發週期,當我們加班加點完成了業務需求的開發,然後又歷經各種艱難險阻通過了測試的驗證,最後經過和產品經理的各種糾纏相愛相殺之後,終於到了最最激動人心的時刻程序要部署上線了。

上線時的情緒波動.png

那麼在程序部署上線的過程中勢必會涉及到線上服務的關閉和重啟,關於對線上服務的啟停這裏面有很多的講究,萬萬不能簡單粗暴的進行關閉和重啟,因為此時線上服務可能承載着生產的流量,可能正在進行重要的業務處理流程。

比如:用户正在購買商品,錢已經付了,恰好這時趕上程序上線,如果我們這時簡單粗暴的對服務進行關閉,重啟,可能就會導致用户付了錢,但是訂單未創建或者商品未出現在用户的購物清單中,給用户造成了實質的損失,這是非常嚴重的後果。

為了保證能在程序上線的過程中做到業務無損,所以線上服務的 優雅關閉優雅啟動 顯得就非常非常重要了。

保持優雅很重要.png

1.1 優雅啟動

在 Java 程序的運行過程中,程序的運行速度一般會隨着程序的運行慢慢的提高,所以從線上表現上來看 Java 程序在運行一段時間後往往會比程序剛啟動的時候會快很多。

這是因為 Java 程序在運行過程中,JVM 會不斷收集到程序運行時的動態數據,這樣可以將高頻執行代碼通過即時編譯成機器碼,隨後程序運行就直接執行機器碼,運行速度完全不輸 C 或者 C++ 程序。

同時在程序執行過程中,用到的類會被加載到 JVM 中緩存,這樣當程序再次使用到的時候不會觸發臨時加載,影響程序執行性能。

我們可以將以上幾點當做 JVM 帶給我們的性能紅利, 而當應用程序重新啟動之後,這些性能紅利也就消失了 ,如果我們讓新啟動的程序繼續承擔之前的流量規模,那麼就會導致程序在剛啟動的時候在沒有這些性能紅利的加持下直接進入高負荷的運轉狀態,這就可能導致線上請求大面積超時,對業務造成影響。

所以説優雅地啟動一個程序是非常重要的,優雅啟動的核心思想就是讓程序在剛啟動的時候不要承擔太大的流量,讓程序在低負荷的狀態下運行一段時間,使其提升到最佳的運行狀態時,在逐步的讓程序承擔更大的流量處理。

下面我們就來看下常用於優雅啟動場景的兩個技術方案:

1.1.1 啟動預熱

啟動預熱就是讓剛剛上線的應用程序不要一下就承擔之前的全部流量,而是在一個時間窗口內慢慢的將流量打到剛上線的應用程序上,目的是讓 JVM 先緩慢的收集程序運行時的一些動態數據,將高頻代碼即時編譯為機器碼。

這個技術方案在眾多 RPC 框架的實現中我們都可以看到,服務調用方會從註冊中心拿到所有服務提供方的地址,然後從這些地址中通過特定的負載均衡算法從中選取一個服務提供方的發送請求。

為了能夠使剛剛上線的服務提供方有時間去預熱,所以我們就要從源頭上控制服務調用方發送的流量,服務調用方在發起 RPC 調用時應該儘量少的去負載均衡到剛剛啟動的服務提供方實例。

那麼服務調用方如何才能判斷哪些是剛剛啟動的服務提供方實例呢?

服務提供方在啟動成功後會向註冊中心註冊自己的服務信息,我們可以將服務提供方的真實啟動時間包含在服務信息中一起向註冊中心註冊,這樣註冊中心就會通知服務調用方有新的服務提供方實例上線並告知其啟動時間。

服務調用方可以根據這個啟動時間,慢慢的將負載權重增加到這個剛啟動的服務提供方實例上。這樣就可以解決服務提供方冷啟動的問題,調用方通過在一個時間窗口內將請求慢慢的打到提供方實例上,這樣就可以讓剛剛啟動的提供方實例有時間去預熱,達到平滑上線的效果。

1.1.2 延遲暴露

啟動預熱更多的是從服務調用方的角度通過降低剛剛啟動的服務提供方實例的負載均衡權重來實現優雅啟動。

而延遲暴露則是從服務提供方的角度,延遲暴露服務時間,利用延遲的這段時間,服務提供方可以預先加載依賴的一些資源,比如:緩存數據,spring 容器中的 bean 。等到這些資源全部加載完畢就位之後,我們在將服務提供方實例暴露出去。這樣可以有效降低啟動前期請求處理出錯的概率。

比如我們可以在 dubbo 應用中可以配置服務的延遲暴露時間:

//延遲5秒暴露服務
<dubbo:service delay="5000" />

1.2 優雅關閉

優雅關閉需要考慮的問題和處理的場景要比優雅啟動要複雜的多,因為一個正常在線上運行的服務程序正在承擔着生產的流量,同時也正在進行業務流程的處理。

要對這樣的一個服務程序進行優雅關閉保證業務無損還是非常有挑戰的,一個好的關閉流程,可以確保我們業務實現平滑的上下線,避免上線之後增加很多不必要的額外運維工作。

下面我們就來討論下具體應該從哪幾個角度着手考慮實現優雅關閉:

1.2.1 切走流量

image.png

第一步肯定是要將程序承擔的現有流量全部切走,告訴服務調用方,我要進行關閉了,請不要在給我發送請求。那麼如果進行切流呢??

在 RPC 的場景中,服務調用方通過服務發現的方式從註冊中心中動態感知服務提供者的上下線變化。在服務提供方關閉之前,首先就要自己從註冊中心中取消註冊,隨後註冊中心會通知服務調用方,有服務提供者實例下線,請將其從本地緩存列表中剔除。這樣就可以使得服務調用方之後的 RPC 調用不在請求到下線的服務提供方實例上。

但是這裏會有一個問題,就是通常我們的註冊中心都是 AP 類型的,它只會保證最終一致性,並不會保證實時一致性,基於這個原因,服務調用方感知到服務提供者下線的事件可能是延後的,那麼在這個延遲時間內,服務調用方極有可能會向正在下線的服務發起 RPC 請求。

因為服務提供方已經開始進入關閉流程,那麼很多對象在這時可能已經被銷燬了,這時如果在收到請求過來,肯定是無法處理的,甚至可能還會拋出一個莫名其妙的異常出來,對業務造成一定的影響。

那麼既然這個問題是由於註冊中心可能存在的延遲通知引起的,那麼我們就很自然的想到了讓準備下線的服務提供方主動去通知它的服務調用方。

這種服務提供方 主動通知 在加上註冊中心 被動通知 的兩個方案結合在一起應該就能確保萬無一失了吧。

事實上,在大部分場景下這個方案是可行的,但是還有一種極端的情況需要應對,就是當服務提供方通知調用方自己下線的網絡請求在到達服務調用方之前的很極限的一個時間內,服務調用者向正在下線的服務提供方發起了 RPC 請求,這種極端的情況,就需要服務提供方和調用方一起配合來應對了。

首先服務提供方在準備關閉的時候,就把自己設置為正在關閉狀態,在這個狀態下不會接受任何請求,如果這時遇到了上邊這種極端情況下的請求,那麼就拋出一個 CloseException (這個異常是提供方和調用方提前約定好的),調用方收到這個 CloseException ,則將該服務提供方的節點剔除,並從剩餘節點中通過負載均衡選取一個節點進行重試,通過讓這個請求快速失敗從而保證業務無損。

這三種方案結合在一起,筆者認為就是一個比較完美的切流方案了。

1.2.2 儘量保證業務無損

當把流量全部切走後,可能此時將要關閉的服務程序中還有正在處理的部分業務請求,那麼我們就必須得等到這些業務處理請求全部處理完畢,並將業務結果響應給客户端後,在對服務進行關閉。

當然為了保證關閉流程的可控,我們需要引入關閉超時時間限制,當剩下的業務請求處理超時,那麼就強制關閉。

為了保證關閉流程的可控,我們只能做到儘可能的保證業務無損而不是百分之百保證。所以在程序上線之後,我們應該對業務異常數據進行監控並及時修復。

通過以上介紹的優雅關閉方案我們知道,當我們將要優雅關閉一個應用程序時,我們需要做好以下兩項工作:

  1. 我們首先要做的就是將當前將要關閉的應用程序上承載的生產流量全部切走,保證不會有新的流量打到將要關閉的應用程序實例上。

  2. 當所有的生產流量切走之後,我們還需要保證當前將要關閉的應用程序實例正在處理的業務請求要使其處理完畢,並將業務處理結果響應給客户端。以保證業務無損。當然為了使關閉流程變得可控,我們需要引入關閉超時時間。

以上兩項工作就是我們在應用程序將要被關閉時需要做的, 那麼問題是我們如何才能知道應用程序要被關閉呢 ?換句話説,我們在應用程序裏怎麼才能感知到程序進程的關閉事件從而觸發上述兩項優雅關閉的操作執行呢?

既然我們有這樣的需求,那麼操作系統內核肯定會給我們提供這樣的機制,事實上我們可以通過捕獲操作系統給進程發送的信號來獲取關閉進程通知,並在相應信號回調中觸發優雅關閉的操作。

接下來讓我們來看一下操作系統內核提供的信號機制:

2. 內核信號機制

信號是操作系統內核為我們提供用於在進程間通信的機制,內核可以利用信號來通知進程,當前系統所發生的的事件(包括關閉進程事件)。

信號在內核中並沒有用特別複雜的數據結構來表示,只是用一個代號一樣的數字來標識不同的信號。Linux 提供了幾十種信號,分別代表不同的意義。信號之間依靠它們的值來區分

信號可以在任何時候發送給進程,進程需要為這個信號配置信號處理函數。當某個信號發生的時候,就默認執行對應的信號處理函數就可以了。這就相當於一個操作系統的應急手冊,事先定義好遇到什麼情況,做什麼事情,提前準備好,出了事情照着做就可以了。

內核發出的信號就代表當前系統遇到了某種情況,我們需要應對的步驟就封裝在對應信號的回調函數中。

信號機制引入的目的就在於:

  • 讓應用進程知道當前已經發生了某個特定的事件(比如進程的關閉事件)。

  • 強制進程執行我們事先設定好的信號處理函數(比如封裝優雅關閉邏輯)。

通常來説程序一旦啟動就會一直運行下去,除非遇到 OOM 或者我們需要重新發布程序時會在運維腳本中調用 kill 命令關閉程序。Kill 命令從字面意思上來説是殺死進程,但是其本質是向進程發送信號,從而關閉進程。

下面我們使用 kill -l 命令查看下 kill 命令可以向進程發送哪些信號:

# kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU 25) SIGXFSZ
26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGIO 30) SIGPWR
31) SIGSYS 34) SIGRTMIN 35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3
38) SIGRTMIN+4 39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8
43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7
58) SIGRTMAX-6 59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2
63) SIGRTMAX-1 64) SIGRTMAX

筆者這裏提取幾個常見的信號來簡要説明下:

  • SIGINT: 信號代號為 2 。比如我們在終端以非後台模式運行一個進程實例時,要想關閉它,我們可以通過 Ctrl+C 來關閉這個前台程序。這個 Ctrl+C 向進程發送的正是 SIGINT 信號。

  • SIGQUIT: 信號代號為 3 。比如我們使用 Ctrl+\ 來關閉一個前台進程,此時會向進程發送 SIGQUIT 信號, 與 SIGINT 信號不同的是 ,通過 SIGQUIT 信號終止的進程會在退出時,通過 Core Dump 將當前進程的運行狀態保存在 core dump 文件裏面,方便後續查看。

  • SIGKILL: 信號代號為 9 。通過 kill -9 pid 命令結束進程是非常非常危險的動作, 我們應該堅決制止這種關閉進程的行為 ,因為 SIGKILL 信號是不能被進程捕獲和忽略的,只能執行內核定義的默認操作直接關閉進程。 而我們的優雅關閉操作是需要通過捕獲操作系統信號,從而可以在對應的信號處理函數中執行優雅關閉的動作 。由於 SIGKILL 信號不能被捕獲,所以優雅關閉也就無法實現。現在大家就趕快檢查下自己公司生產環境的運維腳本是否是通過 kill -9 pid 命令來結束進程的,一定要避免用這種方式,因為這種方式是極其無情並且略帶殘忍的關閉進程行為。

image.png
  • SIGSTOP : 信號代號為 19 。該信號和 SIGKILL 信號一樣都是無法被應用程序忽略和捕獲的。向進程發送 SIGSTOP 信號也是無法實現優雅關閉的。通過 Ctrl+Z 來關閉一個前台進程,發送的信號就是 SIGSTOP 信號。

  • SIGTERM: 信號代號為 15 。我們通常會使用 kill 命令來關閉一個後台運行的進程,kill 命令發送的默認信號就是 SIGTERM , 該信號也是本文要討論的優雅關閉的基礎 ,我們通常會使用 kill pid 或者 kill -15 pid 來向後台進程發送 SIGTERM 信號用以實現進程的優雅關閉。大家如果發現自己公司生產環境的運維腳本中使用的是 kill -9 pid 命令來結束進程,那麼就要馬上換成 kill pid 命令。

以上列舉的都是我們常用的一些信號,大家也可以通過 man 7 signal 命令查看每種信號對應的含義:

Signal     Value     Action   Comment
──────────────────────────────────────────────────────────────────────
SIGHUP 1 Term Hangup detected on controlling terminal
or death of controlling process
SIGINT 2 Term Interrupt from keyboard
SIGQUIT 3 Core Quit from keyboard
SIGILL 4 Core Illegal Instruction


SIGABRT 6 Core Abort signal from abort(3)
SIGFPE 8 Core Floating point exception
SIGKILL 9 Term Kill signal
SIGSEGV 11 Core Invalid memory reference
SIGPIPE 13 Term Broken pipe: write to pipe with no
readers
SIGALRM 14 Term Timer signal from alarm(2)
SIGTERM 15 Term Termination signal
SIGUSR1 30,10,16 Term User-defined signal 1
SIGUSR2 31,12,17 Term User-defined signal 2
……

而應用進程對於信號的處理一般分為以下三種方式:

  • 內核定義的默認操作: 系統內核對每種信號都規定了默認操作,比如上面列表 Action 列中的 Term ,就是終止進程的意思。前邊介紹的 SIGINT 信號和 SIGTERM 信號的默認操作就是 Term 。Core 的意思是 Core Dump ,即終止進程後會通過 Core Dump 將當前進程的運行狀態保存在文件裏面,方便我們事後進行分析問題在哪裏。前邊介紹的 SIGQUIT 信號默認操作就是 Core 。

  • 捕獲信號: 應用程序可以利用內核提供的系統調用來捕獲信號,並將優雅關閉的步驟封裝在對應信號的處理函數中。當向進程發送關閉信號 SIGTERM  的時候,在進程內我們可以通過捕獲 SIGTERM 信號,隨即就會執行我們自定義的信號處理函數。我們從而可以在信號處理函數中執行進程優雅關閉的邏輯。

  • 忽略信號: 當我們不希望處理某些信號的時候,就可以忽略該信號,不做任何處理, 但是前邊介紹的 SIGKILL 信號和 SIGSTOP 是無法被捕獲和忽略的,內核會直接執行這兩個信號定義的默認操作直接關閉進程。

當我們不希望信號執行內核定義的默認操作時,我們就需要在進程內捕獲信號,並註冊信號的回調函數來執行我們自定義的信號處理邏輯。

比如我們在本文中要討論的優雅關閉場景,當進程接收到 SIGTERM 信號時,為了實現進程的優雅關閉,我們並不希望進程執行 SIGTERM 信號的默認操作直接關閉進程,所以我們要在進程中捕獲 SIGTERM 信號,並將優雅關閉的操作步驟封裝在對應的信號處理函數中。

2.1 如何捕獲信號

在介紹完了內核信號的分類以及進程對於信號處理的三種方式之後,下面我們來看下如何來捕獲內核信號,並在對應信號回調函數中自定義我們的處理邏輯。

內核提供了 sigaction 系統調用,來供我們捕獲信號以及與相應的信號處理函數綁定起來。

int sigaction(int signum, const struct sigaction *act,
struct sigaction *oldact)
;
  • int signum: 表示我們想要在進程中捕獲的信號,比如本文中我們要實現優雅關閉就需要在進程中捕獲 SIGTERM 信號,對應的 signum = 15 。

  • struct sigaction *act: 內核中會用一個 sigaction 結構體來封裝我們自定義的信號處理邏輯。

  • struct sigaction *oldact: 這裏是為了兼容老的信號處理函數,瞭解一下就可以了,和本文主線無關。

sigaction 結構體用來封裝信號對應的處理函數,以及更加精細化控制信號處理的信息。

struct sigaction {
__sighandler_t sa_handler;
unsigned long sa_flags;
.......
sigset_t sa_mask;
};
  • __sighandler_t sa_handler: 其實本質上是一個函數指針,用來保存我們為信號註冊的信號處理函數, 優雅關閉的邏輯就封裝在這裏

  • long sa_flags: 為了更加精細化的控制信號處理邏輯,這個字段保存了一些控制信號處理行為的選項集合。常見的選項有:

    • SA_ONESHOT:意思是我們註冊的信號處理函數,僅僅只起一次作用。響應完一次後,就設置回默認行為。

    • SA_NOMASK:表示信號處理函數在執行的過程中會被中斷。比如我們進程捕獲到一個感興趣的信號,隨後會執行註冊的信號處理函數,但是此時進程又收到其他的信號或者和上次相同的信號,此時正在執行的信號處理函數會被中斷,從而轉去執行最新到來的信號處理函數。 如果連續產生多個相同的信號,那麼我們的信號處理函數就要做好同步,冪等等措施

    • SA_INTERRUPT:當進程正在執行一個非常耗時的系統調用時,如果此時進程接收到了信號,那麼這個系統調用將會被信號中斷,進程轉去執行相應的信號處理函數。那麼當信號處理函數執行完時,如果這裏設置了 SA_INTERRUPT ,那麼系統調用將不會繼續執行並且會返回一個 -EINTR 常量,告訴調用方,這個系統調用被信號中斷了,怎麼處理你看着辦吧。

    • SA_RESTART:當系統調用被信號中斷後,相應的信號處理函數執行完畢後,如果這裏設置了 SA_RESTART 系統調用將會被自動重新啟動。

  • sigset_t sa_mask: 這個字段主要指定在信號處理函數正在運行的過程中,如果連續產生多個信號,需要屏蔽哪些信號。也就是説當進程收到屏蔽的信號時,正在進行的信號處理函數不會被中斷。

屏蔽並不意味着信號一定丟失,而是暫存,這樣可以使相同信號的處理函數,在進程連續接收到多個相同的信號時,可以一個一個的處理。

最終通過 sigaction 函數會調用到底層的系統調用 rt_sigaction 函數,在 rt_sigaction 中會將上邊介紹的用户態 struct sigaction 結構拷貝為內核態的 k_sigaction ,然後調用 do_sigaction 函數。

最後在 do_sigaction 函數中將用户要在進程中捕獲的信號以及相應的信號處理函數設置到進程描述符 task_struct 結構裏。

進程中的信號結構.png

進程在內核中的數據結構 task_struct 中有一個 struct sighand_struct 結構的屬性 sighand ,struct sighand_struct 結構中包含一個 k_sigaction 類型的數組 action[] ,這個數組保存的就是進程中需要捕獲的信號以及對應的信號處理函數在內核中的結構體 k_sigaction ,數組下標為進程需要捕獲的信號。

#include <signal.h>

static void sig_handler(int signum) {

if (signum == SIGTERM) {

.....執行優雅關閉邏輯....

}

}

int main (Void) {

struct sigaction sa_usr; //定義sigaction結構體
sa_usr.sa_flags = 0;
sa_usr.sa_handler = sig_handler; //設置信號處理函數

sigaction(SIGTERM, &sa_usr, NULL);//進程捕獲信號,註冊信號處理函數

,,,,,,,,,,,,
}

我們可以通過如上簡單的示例代碼,將 SIGTERM 信號及其對應的自定義信號處理函數註冊到進程中,當我們執行 kill -15 pid 命令之後,進程就會捕獲到 SIGTERM 信號,隨後就可以執行優雅關閉步驟了。

3. JVM 中的 ShutdownHook

在《2. 內核信號機制》小節中為大家介紹的內容是操作系統內核為我們實現進程的優雅關閉提供的最底層系統級別的支持機制,在內核的強力支持下,那麼本文的主題 Java 進程的優雅關閉就很容易實現了。

我們要想實現 Java 進程的優雅關閉功能,只需要在進程啟動的時候將優雅關閉的操作封裝在一個 Thread 中,隨後將這個 Thread 註冊到 JVM 的 ShutdownHook 中就好了,當 JVM 進程接收到 kill -15 信號時,就會執行我們註冊的 ShutdownHook 關閉鈎子,進而執行我們定義的優雅關閉步驟。

        Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
.....執行優雅關閉步驟.....
}
});

3.1 導致 JVM 退出的幾種情況

  1. JVM 進程中最後一個非守護線程退出。

  2. 在程序代碼中主動調用 java.lang.System#exit(int status) 方法,會導致 JVM 進程的退出並觸發 ShutdownHook 的調用。參數 int status 如果是非零值,則表示本次關閉是在一個非正常情況下的關閉行為。比如:進程發生 OOM 異常或者其他運行時異常。

public static void main(String[] args) {
try {

......進程啟動main函數.......

} catch (RuntimeException e) {
logger.error(e.getMessage(), e);
// JVM 進程主動關閉觸發調用 shutdownHook
System.exit(1);
}
}
  1. 當 JVM 進程接收到第二小節《2.內核信號機制》介紹的那些關閉信號時, JVM 進程會被關閉。 由於 SIGKILL 信號和 SIGSTOP 信號不能夠被進程捕獲和忽略 ,這兩個信號會直接粗暴地關閉 JVM 進程,所以一般我們會發送 SIGTERM 信號,JVM 進程通過捕獲 SIGTERM 信號,從而可以執行我們定義的 ShutdownHook 完成優雅關閉的操作。

  2. Native Method 執行過程中發生錯誤,比如試圖訪問一個不存在的內存,這樣也會導致 JVM 強制關閉,ShutdownHook 也不會運行。

3.2 使用 ShutdownHook 的注意事項

  1. ShutdownHook 其實本質上是一個已經被初始化但是未啟動的 Thread ,這些通過 Runtime.getRuntime().addShutdownHook 方法註冊的 ShutdownHooks ,在 JVM 進程關閉的時候會被啟動 併發執行 ,但是並 不會保證執行順序

所以在編寫 ShutdownHook 中的邏輯時,我們應該確保程序的線程安全性,並儘可能避免死鎖。最好是一個 JVM 進程只註冊一個 ShutdownHook 。

  1. 如果我們通過 java.lang.Runtime#runFinalizersOnExit(boolean value) 開啟了 finalization-on-exit ,那麼當所有 ShutdownHook 運行完畢之後,JVM 在關閉之前將會繼續調用所有未被調用的 finalizers 方法。默認 finalization-on-exit 選項是關閉的。

注意:當 JVM 開始關閉並執行上述關閉操作的時候,守護線程是會繼續運行的,如果用户使用 java.lang.System#exit(int status) 方法主動發起 JVM 關閉,那麼關閉期間非守護線程也是會繼續運行的。

  1. 一旦 JVM 進程開始關閉,一般情況下這個過程是不可以被中斷的,除非操作系統強制中斷或者用户通過調用 java.lang.Runtime#halt(int status) 來強制關閉。

   public void halt(int status) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkExit(status);
}
Shutdown.halt(status);
}

java.lang.Runtime#halt(int status) 方法是用來強制關閉正在運行的 JVM 進程的,它會導致我們註冊的 ShutdownHook 不會被運行和執行,如果此時 JVM 正在執行 ShutdownHook ,當調用該方法後,JVM 進程將會被強制關閉,並不會等待 ShutdownHook 執行完畢。

  1. 當 JVM 關閉流程開始的時候,就不能在向其註冊 ShutdownHook 或者取消註冊之前已經註冊好的 ShutdownHook 了,否則將會拋出 IllegalStateException異常。

  2. ShutdownHook 中的程序應該儘快的完成優雅關閉邏輯,因為當用户調用 System#exit 方法的時候是希望 JVM 在保證業務無損的情況下儘快完成關閉動作。這裏並不適合做一些需要長時間運行的任務或者和用户交互的操作。

如果是因為物理機關閉從而導致的 JVM 關閉,那麼操作系統只會允許 JVM 限定的時間內儘快的關閉,超過限定時間操作系統將會強制關閉 JVM 。

  1. ShutdownHook 中可能也會拋出異常,而 ShutdownHook 對於 JVM 來説本質上是一個 Thread ,那麼對於 ShutdownHook 中未捕獲的異常,JVM 的處理方法和其他普通的線程一樣,都是通過調用 ThreadGroup#uncaughtException 方法來處理。此方法的默認實現是將異常的堆棧跟蹤打印到 System#err 並終止異常的 ShutdownHook 線程。

注意:這裏只會停止異常的 ShutdownHook ,但不會影響其他 ShutdownHook 線程的執行更不會導致 JVM 退出。

  1. 最後也是非常重要的一點是,當 JVM 進程接收到 SIGKILL 信號和 SIGSTOP 信號時,是會強制關閉,並不會執行 ShutdownHook 。另外一種導致 JVM 強制關閉的情況就是 Native Method 執行過程中發生錯誤,比如試圖訪問一個不存在的內存,這樣也會導致 JVM 強制關閉,ShutdownHook 也不會運行。

3.3 ShutdownHook 執行原理

我們在 JVM 中通過 Runtime.getRuntime().addShutdownHook 添加關閉鈎子,當 JVM 接收到 SIGTERM 信號之後,就會調用我們註冊的這些 ShutdownHooks 。

本小節介紹的 ShutdownHook 就類似於我們在第二小節《內核信號機制》中介紹的信號處理函數。

大家這裏一定會有個疑問,那就是在介紹內核信號機制小節中,我們可以通過系統調用 sigaction 函數向內核註冊進程要捕獲的信號以及對應的信號處理函數。

int sigaction(int signum, const struct sigaction *act,
struct sigaction *oldact)
;

但是在本小節介紹的 JVM 中,我們只是通過 Runtime.getRuntime().addShutdownHook 註冊了一個關閉鈎子。但是並未註冊 JVM 進程所需要捕獲的信號。那麼 JVM 是怎麼捕獲關閉信號的呢?

        Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
.....執行優雅關閉步驟.....
}
});

事實上,JVM 捕獲操作系統信號的部分在 JDK 中已經幫我們處理好了,在用户層我們並不需要關注捕獲信號的處理,只需要關注信號的處理邏輯即可。

下面我們就來看一下 JDK 是如何幫助我們將要捕獲的信號向內核註冊的?

當 JVM 第一個線程被初始化之後,隨後就會調用 System#initializeSystemClass 函數來初始化 JDK 中的一些系統類,其中就包括註冊 JVM 進程需要捕獲的信號以及信號處理函數。

public final class System {

private static void initializeSystemClass() {

.......省略.......

// Setup Java signal handlers for HUP, TERM, and INT (where available).
Terminator.setup();

.......省略.......

}

}

從這裏可以看出,JDK 在向 JVM 註冊需要捕獲的內核信號是在 Terminator 類中進行的。


class Terminator {
//信號處理函數
private static SignalHandler handler = null;

static void setup() {
if (handler != null) return;
SignalHandler sh = new SignalHandler() {
public void handle(Signal sig) {
Shutdown.exit(sig.getNumber() + 0200);
}
};
handler = sh;

try {
Signal.handle(new Signal("HUP"), sh);
} catch (IllegalArgumentException e) {
}
try {
Signal.handle(new Signal("INT"), sh);
} catch (IllegalArgumentException e) {
}
try {
Signal.handle(new Signal("TERM"), sh);
} catch (IllegalArgumentException e) {
}
}

}

JDK 向我們提供了 sun.misc.Signal#handle(Signal signal, SignalHandler signalHandler) 函數來實現在 JVM 進程中對內核信號的捕獲。底層依賴於我們在第二小節介紹的系統調用 sigaction 。

int sigaction(int signum, const struct sigaction *act,
struct sigaction *oldact)
;

sun.misc.Signal#handle 函數的參數含義和系統調用函數 sigaction 中的參數含義是一一對應的:

  • Signal signal :表示要捕獲的內核信號。從這裏我們可以看出 JVM 主要捕獲三種信號:SIGHUP(1),SIGINT(2),SIGTERM(15)。

除了上述的這三種信號之外,JVM 如果接收到其他信號,會執行系統內核默認的操作,直接關閉進程,並不會觸發 ShutdownHook 的執行。

  • SignalHandler handler :信號響應函數。我們看到這裏直接調用了 Shutdown#exit 函數。
    SignalHandler sh = new SignalHandler() {
public void handle(Signal sig) {
Shutdown.exit(sig.getNumber() + 0200);
}
};

我們這裏應該很容易就會猜測出 ShutdownHook 的調用應該就是在 Shutdown#exit 函數中被觸發的。

class Shutdown {

static void exit(int status) {

........省略.........

synchronized (Shutdown.class) {
// 開始 JVM 關閉流程,執行 ShutdownHooks
sequence();
// 強制關閉 JVM
halt(status);
}

}

private static void sequence() {
synchronized (lock) {
if (state != HOOKS) return;
}
//觸發 ShutdownHooks
runHooks();
boolean rfoe;
synchronized (lock) {
state = FINALIZERS;
rfoe = runFinalizersOnExit;
}
//如果 runFinalizersOnExit = true
//開始運行所有未被調用過的 Finalizers
if (rfoe) runAllFinalizers();
}
}

Shutdown#sequence 函數中的邏輯就是我們在《3.2 使用ShutdownHook的注意事項》小節中介紹的 JVM 關閉時的運行邏輯:在這裏會觸發所有 ShutdownHook 的 併發運行 。注意這裏並不會保證運行順序。

當所有 ShutdownHook 運行完畢之後,如果我們通過 java.lang.Runtime#runFinalizersOnExit(boolean value) 開啟了 finalization-on-exit 選項,JVM 在關閉之前將會繼續調用所有未被調用的 finalizers 方法。默認 finalization-on-exit 選項是關閉的。

3.4 ShutdownHook 的執行

shutdownhook的運行.png

如上圖所示,在 JDK 的 Shutdown 類中,包含了一個 Runnable[] hooks 數組,容量為 10 。JDK 中的 ShutdownHook 是以類型來分類的,數組 hooks 每一個槽中存放的是一種特定類型的 ShutdownHook 。

而我們通常在程序代碼中通過 Runtime.getRuntime().addShutdownHook 註冊的是 Application hooks 類型的 ShutdownHook ,存放在數組 hooks 中索引為 1 的槽中。

當在 Shutdown#sequence 中觸發 runHooks() 函數開始運行 JVM 中所有類型的 ShutdownHooks 時,會在 runHooks() 函數中依次遍歷數組 hooks 中的 Runnable ,進而開始運行 Runnable 中封裝的 ShutdownHooks 。

當遍歷到數組 Hooks 的第二個槽(索引為 1 )的時候, Application hooks 類型的 ShutdownHook 得以運行,也就是我們通過 Runtime.getRuntime().addShutdownHook 註冊的 ShutdownHook 在這個時候開始運行起來。


// The system shutdown hooks are registered with a predefined slot.
// The list of shutdown hooks is as follows:
// (0) Console restore hook
// (1) Application hooks
// (2) DeleteOnExit hook
private static final int MAX_SYSTEM_HOOKS = 10;
private static final Runnable[] hooks = new Runnable[MAX_SYSTEM_HOOKS];

/* Run all registered shutdown hooks
*/

private static void runHooks() {
for (int i=0; i < MAX_SYSTEM_HOOKS; i++) {
try {
Runnable hook;
synchronized (lock) {
// acquire the lock to make sure the hook registered during
// shutdown is visible here.
currentRunningHook = i;
hook = hooks[i];
}
if (hook != null) hook.run();
} catch(Throwable t) {
if (t instanceof ThreadDeath) {
ThreadDeath td = (ThreadDeath)t;
throw td;
}
}
}
}

下面我們就來看一下,JDK 是如果通過 Runtime.getRuntime().addShutdownHook 函數將我們自定義的 ShutdownHook 註冊到 Shutdown 類中的數組 Hooks 裏的。

3.5 ShutdownHook 的註冊

public class Runtime {

public void addShutdownHook(Thread hook) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new RuntimePermission("shutdownHooks"));
}
//注意 這裏註冊的是 Application 類型的 hooks
ApplicationShutdownHooks.add(hook);
}

}

從 JDK 源碼中我們看到在 Runtime 類中的 addShutdownHook 方法裏,JDK 會將我們自定義的 ShutdownHook 封裝在 ApplicationShutdownHooks 類中,從這類的命名上看,它裏邊封裝的就是我們在上小節《3.4 ShutdownHook 的執行》提到的 Application hooks 類型的 ShutdownHook ,由用户自定義實現。

class ApplicationShutdownHooks {
// 存放用户自定義的 Application 類型的 hooks
private static IdentityHashMap<Thread, Thread> hooks;

static synchronized void add(Thread hook) {
if(hooks == null)
throw new IllegalStateException("Shutdown in progress");

if (hook.isAlive())
throw new IllegalArgumentException("Hook already running");

if (hooks.containsKey(hook))
throw new IllegalArgumentException("Hook previously registered");

hooks.put(hook, hook);
}

static void runHooks() {
Collection<Thread> threads;
synchronized(ApplicationShutdownHooks.class) {
threads = hooks.keySet();
hooks = null;
}
// 順序啟動 shutdownhooks
for (Thread hook : threads) {
hook.start();
}
// 併發調用 shutdownhooks ,等待所有 hooks 運行完畢退出
for (Thread hook : threads) {
try {
hook.join();
} catch (InterruptedException x) { }
}
}
}

ApplicationShutdownHooks 類中也有一個集合 IdentityHashMap<Thread, Thread> hooks ,專門用來存放由用户自定義的 Application hooks 類型的 ShutdownHook 。通過 ApplicationShutdownHooks#add 方法添加進 hooks 集合中。

然後在 runHooks 方法裏挨個啟動 ShutdownHook 線程,併發執行。 注意這裏的 runHooks 方法是 ApplicationShutdownHooks 類中的

在 ApplicationShutdownHooks 類的靜態代碼塊 static{.....} 中會將 runHooks 方法封裝成 Runnable 添加進 Shutdown 類中的 hooks 數組中。注意這裏 Shutdown#add 方法傳遞進的索引是 1 。

class ApplicationShutdownHooks {
/* The set of registered hooks */
private static IdentityHashMap<Thread, Thread> hooks;

static {
try {
Shutdown.add(1 /* shutdown hook invocation order */,
false /* not registered if shutdown in progress */,
new Runnable() {
public void run() {
runHooks();
}
}
);
hooks = new IdentityHashMap<>();
} catch (IllegalStateException e) {
// application shutdown hooks cannot be added if
// shutdown is in progress.
hooks = null;
}
}
}

Shutdownhook的執行.png

Shutdown#add 方法的邏輯就很簡單了:

class Shutdown {

private static final int MAX_SYSTEM_HOOKS = 10;
private static final Runnable[] hooks = new Runnable[MAX_SYSTEM_HOOKS];

static void add(int slot, boolean registerShutdownInProgress, Runnable hook) {
synchronized (lock) {
if (hooks[slot] != null)
throw new InternalError("Shutdown hook at slot " + slot + " already registered");

if (!registerShutdownInProgress) {
if (state > RUNNING)
throw new IllegalStateException("Shutdown in progress");
} else {
if (state > HOOKS || (state == HOOKS && slot <= currentRunningHook))
throw new IllegalStateException("Shutdown in progress");
}

hooks[slot] = hook;
}
}
}
  • 參數 Runnable hook 就是在 ApplicationShutdownHooks 中的靜態代碼塊 static{....} 中將 runHooks 方法封裝成的 Runnable。

  • 參數 int slot 表示將封裝好的 Runnable 放入 hooks 數組中的哪個槽中。這裏我們註冊的是 Application hooks 類型的 ShutdonwHook ,所以這裏的索引為 1 。

  • 參數 registerShutdownInProgress 表示是否允許在 JVM 關閉流程開始之後,繼續向 JVM 添加 ShutdownHook 。默認為 false 表示不允許。否則將會拋出 IllegalStateException 異常。這一點筆者在小節《3.2 使用ShutdownHook的注意事項》中強調過。

以上就是 JVM 如何捕獲操作系統內核信號,如何註冊 ShutdownHook ,以及何時觸發 ShutdownHook 的執行的一個全面介紹。

shutdownhook完整觸發時機.png

讀到這裏大家應該徹底明白了為什麼不能使用 kill -9 pid 命令來關閉進程了吧,現在趕快去檢查一下你們公司生產環境的運維腳本吧!!

俗話説的好 talk is cheap! show me the code! ,在介紹了這麼多關於優雅關閉的理論方案和原理之後,我想大家現在一定很好奇究竟我們該如何實現這一套優雅關閉的方案呢?

那麼接下來筆者就從一些知名框架源碼實現角度,為大家詳細闡述一下優雅關閉是如何實現的?

image.png

4. Spring 的優雅關閉機制

前面兩個小節中我們從支持優雅關閉最底層的內核信號機制開始聊起然後到 JVM 進程實現優雅關閉的 ShutdwonHook 原理,經過這一系列的介紹,我們現在對優雅關閉在內核層和 JVM 層的相關機制原理有了一定的瞭解。

那麼在真實 Java 應用中,我們到底該如何基於上述機制實現一套優雅關閉方案呢?本小節我們來從 Spring 源碼中獲取下答案!!

在介紹 Spring 優雅關閉機制源碼實現之前,筆者先來帶大家回顧下,在 Spring 的應用上下文關閉的時候,Spring 究竟給我們提供了哪些關閉時的回調機制,從而可以讓我們在這些回調中編寫 Java 應用的優雅關閉邏輯。

4.1 發佈 ContextClosedEvent 事件

在 Spring 上下文開始關閉的時候,首先會發布 ContextClosedEvent 事件,注意此時 Spring 容器的 Bean 還沒有開始銷燬,所以我們可以在該事件回調中執行優雅關閉的操作。

@Component
public class ShutdownListener implements ApplicationListener<ContextClosedEvent> {
@Override
public void onApplicationEvent(ContextClosedEvent event) {
........優雅關閉邏輯.....
}
}

4.2 Spring 容器中的 Bean 銷燬前回調

當 Spring 開始銷燬容器中管理的 Bean 之前,會回調所有實現 DestructionAwareBeanPostProcessor 接口的 Bean 中的 postProcessBeforeDestruction 方法。

@Component
public class DestroyBeanPostProcessor implements DestructionAwareBeanPostProcessor {

@Override
public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {

........Spring容器中的Bean開始銷燬前回調.......
}
}

4.3 回調標註 @PreDestroy 註解的方法

@Component
public class Shutdown {
@PreDestroy
public void preDestroy() {
......釋放資源.......
}
}

4.4 回調 DisposableBean 接口中的 destroy 方法

@Component
public class Shutdown implements DisposableBean{

@Override
public void destroy() throws Exception {
......釋放資源......
}

}

4.5 回調自定義的銷燬方法

<bean id="Shutdown" class="com.test.netty.Shutdown"  destroy-method="doDestroy"/>
public class Shutdown {

public void doDestroy() {
.....自定義銷燬方法....
}
}

4.6 Spring 優雅關閉機制的實現

Spring 相關應用程序本質上也是一個 JVM 進程,所以 Spring 框架想要實現優雅關閉機制也必須依託於我們在本文第三小節中介紹的 JVM 的 ShutdownHook 機制。

在 Spring 啟動的時候,需要向 JVM 註冊 ShutdownHook ,當我們執行 kill - 15 pid 命令時,隨後 Spring 會在 ShutdownHook 中觸發上述介紹的五種回調。

下面我們來看一下 Spring 中 ShutdownHook 的註冊邏輯:

4.6.1 Spring 中 ShutdownHook 的註冊

public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext, DisposableBean
{

@Override
public void registerShutdownHook() {
if (this.shutdownHook == null) {
// No shutdown hook registered yet.
this.shutdownHook = new Thread() {
@Override
public void run() {
synchronized (startupShutdownMonitor) {
doClose();
}
}
};
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
}
}

在 Spring 啟動的時候,我們需要調用 AbstractApplicationContext#registerShutdownHook 方法向 JVM 註冊 Spring 的 ShutdownHook ,從這段源碼中我們看出,Spring 將 doClose() 方法封裝在 ShutdownHook 線程中,而 doClose() 方法裏邊就是 Spring 優雅關閉的邏輯。

這裏需要強調的是,當我們在一個純 Spring 環境下,Spring 框架是不會為我們主動調用 registerShutdownHook 方法去向 JVM 註冊 ShutdownHook 的,我們需要手動調用 registerShutdownHook 方法去註冊。

public class SpringShutdownHook {

public static void main(String[] args) throws IOException {
GenericApplicationContext context = new GenericApplicationContext();
........
// 註冊 Shutdown Hook
context.registerShutdownHook();
........
}
}

而在 SpringBoot 環境下,SpringBoot 在啟動的時候會為我們調用這個方法去主動註冊 ShutdownHook 。我們不需要手動註冊。

public class SpringApplication {

public ConfigurableApplicationContext run(String... args) {

...............省略.................

ConfigurableApplicationContext context = null;
context = createApplicationContext();
refreshContext(context);

...............省略.................
}

private void refreshContext(ConfigurableApplicationContext context) {
refresh(context);
if (this.registerShutdownHook) {
try {
context.registerShutdownHook();
}
catch (AccessControlException ex) {
// Not allowed in some environments.
}
}
}

}

4.6.2 Spring 中的優雅關閉邏輯

 protected void doClose() {
// 更新上下文狀態
if (this.active.get() && this.closed.compareAndSet(false, true)) {
if (logger.isInfoEnabled()) {
logger.info("Closing " + this);
}
// 取消 JMX 託管
LiveBeansView.unregisterApplicationContext(this);

try {
// 發佈 ContextClosedEvent 事件
publishEvent(new ContextClosedEvent(this));
}
catch (Throwable ex) {
logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
}

// 回調 Lifecycle beans,相關 stop 方法
if (this.lifecycleProcessor != null) {
try {
this.lifecycleProcessor.onClose();
}
catch (Throwable ex) {
logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
}
}

// 銷燬 bean,觸發前面介紹的幾種回調
destroyBeans();

// Close the state of this context itself.
closeBeanFactory();

// Let subclasses do some final clean-up if they wish...
onClose();

// Switch to inactive.
this.active.set(false);
}
}

在這裏我們可以看出最終是在 AbstractApplicationContext#doClose 方法中觸發本小節開始介紹的五種回調:

  1. 發佈 ContextClosedEvent 事件。 注意這裏是一個同步事件 ,也就是説 Spring 的 ShutdownHook 線程在這裏發佈完事件之後會繼續同步執行事件的處理,等到事件處理完畢後,才會去執行後面的 destroyBeans() 方法對 IOC 容器中的 Bean 進行銷燬。

所以在 ContextClosedEvent 事件監聽類中,可以放心地去做優雅關閉相關的操作,因為此時 Spring 容器中的 Bean 還沒有被銷燬。

  1. destroyBeans() 方法中依次觸發剩下的四種回調。

最後結合前邊小節中介紹的內容,總結 Spring 的整個優雅關閉流程如下圖所示:

Spring優雅關閉機制.png

5. Dubbo 的優雅關閉

本小節優雅關閉部分源碼基於 apache dubbo 2.7.7 版本,該版本中的優雅關閉是有 Bug 的,下面讓我們一起來 Shooting Bug !

在前邊幾個小節的內容中,我們從內核提供的底層技術支持開始聊到了 JVM 的 ShutdonwHook ,然後又從 JVM 聊到了 Spring 框架的優雅關閉機制。

在瞭解了這些內容之後,本小節我們就來看下 dubbo 中的優雅關閉實現,由於現在幾乎所有 Java 應用都會採用 Spring 作為開發框架,所以 dubbo 一般是集成在 Spring 框架中供我們使用的,它的優雅關閉和 Spring 有着緊密的聯繫。

5.1 Dubbo 在 Spring 環境下的優雅關閉

在本文第四小節《4. Spring的優雅關閉機制》的介紹中,我們知道在 Spring 的優雅關閉流程中,Spring 的 ShutdownHook 線程會首先發布 ContextClosedEvent 事件, 該事件是一個同步事件 ,ShutdownHook 線程發佈完該事件緊接着就會同步執行該事件的監聽器,當在事件監聽器中處理完 ContextClosedEvent 事件之後,在回過頭來執行 destroyBeans() 方法並依次觸發剩下的四種回調來銷燬 IOC 容器中的 Bean 。

Spring優雅關閉流程.png

由於在處理 ContextClosedEvent 事件的時候,Dubbo 所依賴的一些關鍵 bean 這時還沒有被銷燬,所以 dubbo 定義了一個 DubboBootstrapApplicationListener 用來監聽 ContextClosedEvent 事件,並在 onContextClosedEvent 事件處理方法中調用 dubboBootstrap.stop() 方法開啟 dubbo 的優雅關閉流程。

public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
implements Ordered
{

@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
// 這裏是 Spring 的同步事件,publishEvent 和處理 Event 是在同一個線程中
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}

private void onContextClosedEvent(ContextClosedEvent event) {
// spring 在 shutdownhook 中會先觸發 ContextClosedEvent ,然後在銷燬 spring beans
// 所以這裏 dubbo 開始優雅關閉時,依賴的 spring beans 並未銷燬
dubboBootstrap.stop();
}

}

當服務提供者 ServiceBean 和服務消費者 ReferenceBean 被初始化時,會將 DubboBootstrapApplicationListener 註冊到 Spring 容器中。並開始監聽 ContextClosedEvent 事件和 ContextRefreshedEvent 事件。

public class ServiceClassPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
ResourceLoaderAware, BeanClassLoaderAware
{

@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {

// @since 2.7.5 註冊spring啟動 關閉事件的listener
//在事件回調中中調用啟動類 DubboBootStrap的start stop來啟動 關閉dubbo應用
registerBeans(registry, DubboBootstrapApplicationListener.class);

........省略.......
}
}

5.2 Dubbo 優雅關閉流程簡介

由於本文的主題是介紹優雅關閉的一整條流程主線,所以這裏筆者只是簡要介紹 Dubbo 優雅關閉的主流程,相關細節部分筆者會在後續的 dubbo 源碼解析系列裏為大家詳細介紹 Dubbo 優雅關閉的細節。為了避免本文發散太多,我們這裏還是聚焦於流程主線。

public class DubboBootstrap extends GenericEventListener {

public DubboBootstrap stop() throws IllegalStateException {
destroy();
return this;
}

}

這裏的核心邏輯其實就是我們在《1.2 優雅關閉》小節中介紹的兩大優雅關閉主題:

  • 從當前正在關閉的應用實例上切走現有生產流量。

  • 保證業務無損。

這裏大家只需要瞭解 Dubbo 優雅關閉的主流程即可,相關細節筆者後續會有一篇專門的文章詳細為大家介紹。

    public void destroy() {
if (destroyLock.tryLock()) {
try {
DubboShutdownHook.destroyAll();

if (started.compareAndSet(true, false)
&& destroyed.compareAndSet(false, true)) {

//取消註冊
unregisterServiceInstance();
//取消元數據服務
unexportMetadataService();
//停止暴露服務
unexportServices();
//取消訂閲服務
unreferServices();
//註銷註冊中心
destroyRegistries();
//關閉服務
DubboShutdownHook.destroyProtocols();
//銷燬註冊中心客户端實例
destroyServiceDiscoveries();
//清除應用配置類以及相關應用模型
clear();
//關閉線程池
shutdown();
//釋放資源
release();
}
} finally {
destroyLock.unlock();
}
}
}

從以上內容可以看出,Dubbo 的優雅關閉依託於 Spring ContextClosedEvent 事件的發佈,而 ContextClosedEvent 事件的發佈又依託於 Spring ShutdownHook 的註冊。

dubbo spring環境優雅關閉.png

從《4.6.1 Spring 中 ShutdownHook 的註冊》小節的介紹中我們知道,在 SpringBoot 環境下,SpringBoot 在啟動的時候會為我們調用 ApplicationContext#registerShutdownHook 方法去主動註冊 ShutdownHook 。我們不需要手動註冊。

而在一個純 Spring 環境下,Spring 框架並不會為我們主動調用 registerShutdownHook 方法去向 JVM 註冊 ShutdownHook 的,我們需要手動調用 registerShutdownHook 方法去註冊。

所以 Dubbo 這裏為了兼容 SpringBoot 環境和純 Spring 環境下的優雅關閉,引入了 SpringExtensionFactory類 ,只要在 Spring 環境下都會調用 registerShutdownHook 去向 JVM 註冊 Spring 的 ShutdownHook 。

public class SpringExtensionFactory implements ExtensionFactory {
private static final Logger logger = LoggerFactory.getLogger(SpringExtensionFactory.class);

private static final Set<ApplicationContext> CONTEXTS = new ConcurrentHashSet<ApplicationContext>();

public static void addApplicationContext(ApplicationContext context) {
CONTEXTS.add(context);
if (context instanceof ConfigurableApplicationContext) {
//在spring啟動成功之後設置shutdownHook(兼容非SpringBoot環境)
((ConfigurableApplicationContext) context).registerShutdownHook();
}
}

}

當服務提供者 ServiceBean 和服務消費者 ReferenceBean 在初始化完成之後,會回調 SpringExtensionFactory#addApplicationContext 方法註冊 ShutdownHook 。

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware
{

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
SpringExtensionFactory.addApplicationContext(applicationContext);
}

}
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
ApplicationContextAware, InitializingBean, DisposableBean
{

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
SpringExtensionFactory.addApplicationContext(applicationContext);
}

}

以上就是 Dubbo 在 Spring 集成環境下的優雅關閉全流程,下面我們來看下 Dubbo 在非 Spring 環境下的優雅關閉流程。

5.3 Dubbo 在非 Spring 環境下的優雅關閉

在上小節的介紹中我們知道 Dubbo 在 Spring 環境下依託 Spring 的 ShutdownHook ,通過監聽 ContextClosedEvent 事件,從而觸發 Dubbo 的優雅關閉流程。

而到了非 Spring 環境下,Dubbo 就需要定義自己的 ShutdownHook ,從而引入了 DubboShutdownHook ,直接將優雅關閉流程封裝在自己的 ShutdownHook 中執行。

public class DubboBootstrap extends GenericEventListener {

private DubboBootstrap() {
configManager = ApplicationModel.getConfigManager();
environment = ApplicationModel.getEnvironment();

DubboShutdownHook.getDubboShutdownHook().register();
ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
@Override
public void callback() throws Throwable {
DubboBootstrap.this.destroy();
}
});
}

}
public class DubboShutdownHook extends Thread {

public void register() {
if (registered.compareAndSet(false, true)) {
DubboShutdownHook dubboShutdownHook = getDubboShutdownHook();
Runtime.getRuntime().addShutdownHook(dubboShutdownHook);
dispatch(new DubboShutdownHookRegisteredEvent(dubboShutdownHook));
}
}

@Override
public void run() {
if (logger.isInfoEnabled()) {
logger.info("Run shutdown hook now.");
}

callback();
doDestroy();
}

private void callback() {
callbacks.callback();
}

}

從源碼中我們看到,當我們的 Dubbo 應用程序接收到 kill -15 pid 信號時,JVM 捕獲到 SIGTERM(15) 信號之後,就會觸發 DubboShutdownHook 線程運行,從而通過 callback() 又回調了上小節中介紹的 DubboBootstrap#destroy 方法(dubbo 的整個優雅關閉邏輯全部封裝在這裏)。

dubbo 非Spring環境下優雅關閉流程.png
public class DubboBootstrap extends GenericEventListener {

public void destroy() {
if (destroyLock.tryLock()) {
try {
DubboShutdownHook.destroyAll();

if (started.compareAndSet(true, false)
&& destroyed.compareAndSet(false, true)) {

........取消註冊......

........取消元數據服務........

........停止暴露服務........

........取消訂閲服務........

........註銷註冊中心........

........關閉服務........

........銷燬註冊中心客户端實例........

........清除應用配置類以及相關應用模型........

........關閉線程池........

........釋放資源........

}
} finally {
destroyLock.unlock();
}
}
}

}

5.4 啊哈!Bug!

前邊我們在《5.1 Dubbo在Spring環境下的優雅關閉》小節和《5.3 Dubbo在非Spring環境下的優雅關閉》小節中介紹的這兩個環境的下的優雅關閉方案,當它們在各自的場景下運行的時候是沒有任何問題的。

但是當這兩種方案結合在一起運行,就出大問題了~~~

還記得筆者在《3.2 使用 ShutdownHook 的注意事項》小節中特別強調的一點:

  • ShutdownHook 其實本質上是一個已經被初始化但是未啟動的 Thread ,這些通過 Runtime.getRuntime().addShutdownHook 方法註冊的 ShutdownHooks ,在 JVM 進程關閉的時候會被啟動 併發執行,但是並不會保證執行順序

所以在編寫 ShutdownHook 中的邏輯時,我們應該確保程序的線程安全性,並儘可能避免死鎖。最好是一個 JVM 進程只註冊一個 ShutdownHook 。

Dubbo在Spring環境下的優雅關閉Bug.png

那麼現在 JVM 中我們註冊了兩個 ShutdownHook 線程,一個 Spring 的 ShutdownHook ,另一個是 Dubbo 的 ShutdonwHook 。那麼這會引出什麼問題呢?

經過前邊的內容介紹我們知道,無論是在 Spring 的 ShutdownHook 中觸發的 ContextClosedEvent 事件還是在 Dubbo 的 ShutdownHook 中執行的 CallBack 。最終都會調用到 DubboBootstrap#destroy 方法執行真正的優雅關閉邏輯。

public class DubboBootstrap extends GenericEventListener {

private final Lock destroyLock = new ReentrantLock();

public void destroy() {
if (destroyLock.tryLock()) {
try {
DubboShutdownHook.destroyAll();

if (started.compareAndSet(true, false)
&& destroyed.compareAndSet(false, true)) {

.......dubbo應用的優雅關閉.......

}
} finally {
destroyLock.unlock();
}
}
}

}

讓我們來設想一個這種的場景:當 Spring 的 ShutdownHook 線程和 Dubbo 的 ShutdownHook 線程同時執行並且在同一個時間點來到 DubboBootstrap#destroy 方法中爭奪 destroyLock 。

  • Dubbo 的 ShutdownHook 線程獲得 destroyLock 進入 destroy() 方法體開始執行優雅關閉邏輯。

  • Spring 的 ShutdownHook 線程沒有獲得 destroyLock,退出 destroy() 方法。

Dubbo優雅關閉Bug.png

在 Spring 的 ShutdownHook 線程退出 destroy() 方法之後緊接着就會執行 destroyBeans() 方法銷燬 IOC 容器中的 Bean ,這裏邊肯定涉及到一些關鍵業務 Bean 的銷燬,比如:數據庫連接池,以及 Dubbo 相關的核心 Bean。

於此同時 Dubbo 的 ShutdownHook 線程開始執行優雅關閉邏輯,《1.2 優雅關閉》小節中我們提到,優雅關閉要保證業務無損。所以需要將剩下正在進行中的業務流程繼續處理完畢並將業務處理結果響應給客户端。但是這時依賴的一些業務關鍵 Bean 已經被銷燬,比如數據庫連接池,這時執行數據庫操作就會拋出 CannotGetJdbcConnectionException 。導致優雅關閉失敗,對業務造成了影響。

5.5 Bug 的修復

該 Bug 最終在 apache dubbo 2.7.15 版本中被修復

詳情可查看Issue:https://github.com/apache/dubbo/issues/7093

經過上小節的分析,我們知道既然這個 Bug 產生的原因是由於 Spring 的 ShutdownHook 線程和 Dubbo 的 ShutdownHook 線程併發執行所導致的。

那麼當我們處於 Spring 環境下的時候,就將 Dubbo 的 ShutdownHook 註銷掉即可。

public class SpringExtensionFactory implements ExtensionFactory {
private static final Logger logger = LoggerFactory.getLogger(SpringExtensionFactory.class);

private static final Set<ApplicationContext> CONTEXTS = new ConcurrentHashSet<ApplicationContext>();

public static void addApplicationContext(ApplicationContext context) {
CONTEXTS.add(context);
if (context instanceof ConfigurableApplicationContext) {
// 註冊 Spring 的 ShutdownHook
((ConfigurableApplicationContext) context).registerShutdownHook();
// 在 Spring 環境下將 Dubbo 的 ShutdownHook 取消掉
DubboShutdownHook.getDubboShutdownHook().unregister();
}
}
}

而在非 Spring 環境下,我們依然保留 Dubbo 的 ShutdownHook 。

public class DubboBootstrap {

private DubboBootstrap() {
configManager = ApplicationModel.getConfigManager();
environment = ApplicationModel.getEnvironment();

DubboShutdownHook.getDubboShutdownHook().register();
ShutdownHookCallbacks.INSTANCE.addCallback(DubboBootstrap.this::destroy);
}

}

以上內容就是 Dubbo 的整個優雅關閉主線流程,以及優雅關閉 Bug 產生的原因和修復方案。

在 Dubbo 的優雅關閉流程中最終會通過 DubboShutdownHook.destroyProtocols() 關閉底層服務。

public class DubboBootstrap extends GenericEventListener {

private final Lock destroyLock = new ReentrantLock();

public void destroy() {
if (destroyLock.tryLock()) {
try {
DubboShutdownHook.destroyAll();

if (started.compareAndSet(true, false)
&& destroyed.compareAndSet(false, true)) {

.......dubbo應用的優雅關閉.......
//關閉服務
DubboShutdownHook.destroyProtocols();

.......dubbo應用的優雅關閉.......

}
} finally {
destroyLock.unlock();
}
}
}

}

在 Dubbo 服務的銷燬過程中,會通過調用 server.close 關閉底層的 Netty 服務。

public class DubboProtocol extends AbstractProtocol {

@Override
public void destroy() {
for (String key : new ArrayList<>(serverMap.keySet())) {
ProtocolServer protocolServer = serverMap.remove(key);
RemotingServer server = protocolServer.getRemotingServer();
server.close(ConfigurationUtils.getServerShutdownTimeout());
...........省略........
}

...........省略........
}

最終觸發 Netty 的優雅關閉。

public class NettyServer extends AbstractServer implements RemotingServer {

@Override
protected void doClose() throws Throwable {
..........關閉底層Channel......
try {
if (bootstrap != null) {
// 關閉 Netty 的主從 Reactor 線程組
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
.........清理緩存Channel數據.......
}

}

6. Netty 的優雅關閉

通過上小節介紹 dubbo 優雅關閉的相關內容,我們很自然的引出了 Netty 的優雅關閉觸發時機,那麼在本小節中筆者將為大家詳細介紹下 Netty 是如何優雅地裝..........優雅地謝幕的~~

image.png

在之前的系列文章中,我們圍繞下圖所展示的 Netty 整個核心框架的運轉流程介紹了主從 ReactorGroup 的創建, 啟動運行接收網絡連接接收網絡數據 ,發送網絡數據,以及 如何在pipeline中處理相關IO事件 的整個源碼實現。

netty中的reactor.png

本小節就到了 Netty 優雅謝幕的時刻了,在這謝幕的過程中,Netty 會對它的主從 ReactorGroup ,以及對應 ReactorGroup 中的 Reacto r進行優雅的關閉。下面讓我們一起來看下這個優雅關閉的過程~~~

6.1 ReactorGroup 的優雅謝幕


public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}

}

在 Netty 進行優雅關閉的整個過程中,這裏涉及到了兩個非常重要的控制參數:

  • gracefulShutdownQuietPeriod :優雅關閉靜默期,默認為 2s 。這個參數主要來保證 Netty 整個關閉過程中的 優雅 。在關閉流程開始後,如果 Reactor 中還有遺留的異步任務需要執行,那麼 Netty 就不能關閉,需要把所有異步任務執行完畢才可以。當所有異步任務執行完畢後,Netty 為了實現更加優雅的關閉操作,一定要保障業務無損,這時候就引入了靜默期這個概念,如果在這個靜默期內,用户沒有新的任務向 Reactor 提交那麼就開始關閉。如果在這個靜默期內,還有用户繼續提交異步任務,那麼就不能關閉,需要把靜默期內用户提交的異步任務執行完畢才可以放心關閉。

  • gracefulShutdownTimeout :優雅關閉超時時間,默認為 15s 。這個參數主要來保證 Netty 整個關閉過程的 可控 。我們知道一個生產級的優雅關閉方案既要保證優雅做到業務無損,更重要的是要保證關閉流程的可控,不能無限制的優雅下去。導致長時間無法完成關閉動作。於是 Netty 就引入了這個參數,如果優雅關閉超時,那麼無論此時有無異步任務需要執行都要開始關閉了。

這兩個控制參數是非常重要核心的兩個參數,我們在後面介紹 Netty 關閉細節的時候還會為大家詳細剖析,這裏大家先從概念上大概理解一下。

在介紹完這兩個重要核心參數之後,我們接下來看下 ReactorGroup 的關閉流程:

我們都知道 Netty 為了保證整個系統的吞吐量以及保證 Reactor 可以線程安全地,有序地處理各個 Channel 上的 IO 事件。基於這個目的 Netty 將其承載的海量連接分攤打散到不同的 Reactor 上處理。

ReactorGroup 中包含多個 Reactor ,每個 Channel 只能註冊到一個固定的 Reactor 上,由這個固定的 Reactor 負責處理該 Channel 上整個生命週期的事件。

一個 Reactor 上註冊了多個 Channel ,負責處理註冊在其上的所有 Channel 的 IO 事件以及異步任務。

ReactorGroup 的結構如下圖所示:

image.png

ReactorGroup 的關閉流程本質上其實是 ReactorGroup 中包含的所有 Reactor 的關閉,當 ReactorGroup 中的所有 Reactor 完成關閉後,ReactorGroup 才算是真正的關閉。


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

// Reactor線程組中的Reactor集合
private final EventExecutor[] children;

// 關閉future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}

@Override
public Future<?> terminationFuture() {
return terminationFuture;
}

}

  • EventExecutor[] children :數組中存放的是當前 ReactorGroup 中包含的所有 Reactor,類型為 EventExecutor。

  • Promise<?> terminationFuture :ReactorGroup 中的關閉 Future ,用户線程通過這個 terminationFuture 可以知道 ReactorGroup 完成關閉的時機,也可以向 terminationFuture 註冊一些 listener 。當 ReactorGroup 完成關閉動作後,會回調用户註冊的這些 listener 。大家可以根據各自的業務場景靈活運用。

在 ReactorGroup 的關閉過程中,會挨個觸發它所包含的所有 Reactor 的關閉流程。並返回 terminationFuture 給用户線程。

當 ReactorGroup 中的所有 Reactor 完成關閉之後,這個 terminationFuture 會被設置為 success,這樣一來用户線程可以感知到 ReactorGroup 已經完成關閉了。

這一點筆者也在 《Reactor在Netty中的實現(創建篇)》 一文中的第四小節《4. 向Reactor線程組中所有的Reactor註冊terminated回調函數》強調過。

在 ReactorGroup 創建的最後一步,會定義 Reactor 關閉的 terminationListener。在 Reactor 的 terminationListener 中會判斷當前 ReactorGroup 中的 Reactor 是否全部關閉,如果已經全部關閉,則會設置 ReactorGroup的 terminationFuture 為 success 。

    //記錄關閉的Reactor個數,當Reactor全部關閉後,ReactorGroup才可以認為關閉成功
private final AtomicInteger terminatedChildren = new AtomicInteger();
//ReactorGroup的關閉future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args)
{

........挨個創建Reactor............

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
//當所有Reactor關閉後 ReactorGroup才認為是關閉成功
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
//向每個Reactor註冊terminationListener
e.terminationFuture().addListener(terminationListener);
}
}

從以上 ReactorGroup 的關閉流程我們可以看出,ReactorGroup 的關閉邏輯只是挨個去觸發它所包含的所有 Reactor 的關閉,Netty 的整個優雅關閉核心其實是在單個 Reactor 的關閉邏輯上。畢竟 Reactor 才是真正驅動 Netty 運轉的核心引擎。

6.2 Reactor 的優雅謝幕

Reactor的優雅謝幕流程.png

Reactor 的狀態特別重要,從 《一文聊透Netty核心引擎Reactor的運轉架構》 一文中我們知道 Reactor 是在一個 for (;;) {....} 死循環中 996 不停地工作。比如輪詢 Channel 上的 IO 就緒事件,處理 IO 就緒事件,執行異步任務就是在這個死循環中完成的。

而 Reactor 在每一次循環任務結束之後,都會先去判斷一下當前 Reactor 的狀態,如果狀態變為準備關閉狀態 ST_SHUTTING_DOWN 後,Reactor 就會開啟優雅關閉流程。

所以在介紹 Reactor 的關閉流程之前,筆者先來為大家捋一捋 Reactor 中的各種狀態。

  • ST_NOT_STARTED = 1 :Reactor 的初始狀態。在 Reactor 剛被創建出來的時候,狀態為 ST_NOT_STARTED 。

  • ST_STARTED = 2 :Reactor 的啟動狀態。當向 Reactor 提交第一個異步任務的時候會觸發 Reactor 的啟動。啟動之後狀態變為 ST_STARTED 。

相關細節可在回顧下 《詳細圖解Netty Reactor啟動全流程》 一文。

  • ST_SHUTTING_DOWN = 3 :Reactor 準備開始關閉狀態。當 Reactor 的 shutdownGracefully 方法被調用的時候,Reactor 的狀態就會變為ST_SHUTTING_DOWN。在這個狀態下,用户仍然可以向 Reactor 提交任務。

  • ST_SHUTDOWN = 4 :Reactor 停止狀態。表示 Reactor 的優雅關閉流程已經結束, 此時用户不能在向 Reactor 提交任務 ,Reactor 會在這個狀態下最後一次執行剩餘的異步任務。

  • ST_TERMINATED = 5 :Reactor 真正的終結狀態,該狀態表示 Reactor 已經完全關閉了。在這個狀態下 Reactor 會設置自己的 terminationFuture 為 Success。進而開始回調上小節末尾提到的 terminationListener 。

在我們瞭解了 Reactor 的各種狀態之後,下面就該來正式開始介紹 Reactor 的關閉流程了:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

//Reactor的狀態 初始為未啟動狀態
private volatile int state = ST_NOT_STARTED;

//Reactor的初始狀態,未啟動
private static final int ST_NOT_STARTED = 1;
//Reactor啟動後的狀態
private static final int ST_STARTED = 2;
//準備正在進行優雅關閉,此時用户仍然可以提交任務,Reactor仍可以執行任務
private static final int ST_SHUTTING_DOWN = 3;
//Reactor停止狀態,表示優雅關閉結束,此時用户不能在提交任務,Reactor最後一次執行剩餘的任務
private static final int ST_SHUTDOWN = 4;
//Reactor中的任務已被全部執行完畢,且不在接受新的任務,真正的終止狀態
private static final int ST_TERMINATED = 5;

//優雅關閉的靜默期
private volatile long gracefulShutdownQuietPeriod;
//優雅關閉超時時間
private volatile long gracefulShutdownTimeout;

//Reactor的關閉Future
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {

......省略參數校驗.......

//此時Reactor的狀態為ST_STARTED
if (isShuttingDown()) {
return terminationFuture();
}

boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return terminationFuture();
}
int newState;
//需要喚醒Reactor去執行關閉流程
wakeup = true;
oldState = state;
if (inEventLoop) {
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
//Reactor正在關閉或者已經關閉
newState = oldState;
wakeup = false;
}
}
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
//優雅關閉靜默期,在該時間內,用户還是可以向Reactor提交任務並且執行,只要有任務在Reactor中,就不能進行關閉
//每隔100ms檢測是否有任務提交進來,如果在靜默期內沒有新的任務提交,那麼才會進行關閉 保證關閉行為的優雅
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
//優雅關閉的最大超時時間,優雅關閉行為不能超過該時間,如果超過的話 不管當前是否還有任務 都要進行關閉
//保證關閉行為的可控
gracefulShutdownTimeout = unit.toNanos(timeout);

//這裏需要保證Reactor線程是在運行狀態,如果已經停止,那麼就不在進行後續關閉行為,直接返回terminationFuture
if (ensureThreadStarted(oldState)) {
return terminationFuture;
}

//將正在監聽IO事件的Reactor從Selector上喚醒,表示要關閉了,開始執行關閉流程
if (wakeup) {
//確保Reactor線程在執行完任務之後 不會在selector上停留
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {
//如果此時Reactor正在Selector上阻塞,則可以確保Reactor被及時喚醒
wakeup(inEventLoop);
}
}

return terminationFuture();
}

@Override
public Future<?> terminationFuture() {
return terminationFuture;
}

}

首先在開啟關閉流程之前,需要調用 isShuttingDown() 判斷一下當前 Reactor 是否已經開始關閉流程或者已經完成關閉。如果已經開始關閉了,這裏會直接返回 Reactor 的 terminationFuture 。


@Override
public boolean isShuttingDown() {
return state >= ST_SHUTTING_DOWN;
}

剩下的邏輯就是不停的在一個 for 循環中通過 CAS 不停的嘗試將 Reactor 的當前 ST_STARTED 狀態改為 ST_SHUTTING_DOWN 正在關閉狀態。

如果通過 inEventLoop() 判斷出當前執行線程是 Reactor 線程,那麼表示當前 Reactor 的狀態只會是 ST_STARTED 運行狀態,那麼就可以直接將 newState 設置為 ST_SHUTTING_DOWN 。因為只有 Reactor 處於 ST_STARTED 狀態的時候才會運行到這裏。否則在前邊就直接返回 terminationFuture了。

如果當前執行線程為用户線程並不是 Reactor 線程的話,那麼此時 Reactor 的狀態可能是正在關閉狀態或者已經關閉狀態,用户線程在重複發起 Reactor 的關閉流程。所以這些異常場景的處理會在 switch(oldState){....} 語句中完成。

            switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
//Reactor正在關閉或者已經關閉
newState = oldState;
//當前Reactor已經處於關閉流程中,則無需在喚醒Reactor了
wakeup = false;
}

如果當前 Reactor 還未發起關閉流程,比如狀態為 ST_NOT_STARTED 或者 ST_STARTED ,那麼直接可以放心的將 newState 設置為 ST_SHUTTING_DOWN 。

如果當前 Reactor 已經處於關閉流程中或者已經完成關閉,比如狀態為 ST_SHUTTING_DOWN ,ST_SHUTDOWN 或者 ST_TERMINATED 。則沒有必要在喚醒 Reactor 重複執行關閉流程了 wakeup = false。Reactor 的狀態維持當前狀態不變。

當 Reactor 的狀態確定完畢後,則在 for 循環中不斷的通過 CAS 修改 Reactor 的當前狀態。此時 oldState = ST_STARTED ,newState = ST_SHUTTING_DOWN 。


if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}

隨後在 Reactor 中設置我們在《6.1 ReactorGroup 的優雅謝幕》小節開始處介紹的控制 Netty 優雅關閉的兩個非常重要的核心參數:

  • gracefulShutdownQuietPeriod :優雅關閉靜默期,默認為 2s 。當 Reactor 中已經沒有異步任務需要在執行時,該靜默期開始觸發,Netty 在這裏會每隔 100ms 檢測一下是否有任務提交進來,如果在靜默期內沒有新的任務提交,那麼才會進行關閉,保證關閉行為的優雅。

  • gracefulShutdownTimeout :優雅關閉超時時間,默認為 15s 。優雅關閉行為不能超過該時間,如果超過的話不管當前是否還有任務都要進行關閉,保證關閉行為的可控。

流程走到這裏,Reactor 就開始準備執行關閉流程了,那麼在進行關閉操作之前,我們需要確保 Reactor 線程此時應該是運行狀態,如果此時 Reactor 線程還未開始運行那麼就需要讓它運行起來執行關閉操作。


//這裏需要保證Reactor線程是在運行狀態,如果已經停止,
//那麼就不在進行後續關閉行為,直接返回terminationFuture
if (ensureThreadStarted(oldState)) {
return terminationFuture;
}


private boolean ensureThreadStarted(int oldState) {
if (oldState == ST_NOT_STARTED) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_TERMINATED);
terminationFuture.tryFailure(cause);

if (!(cause instanceof Exception)) {
// Also rethrow as it may be an OOME for example
PlatformDependent.throwException(cause);
}
return true;
}
}
return false;
}

如果此時 Reactor 線程剛剛執行完異步任務或者正在 Selector 上阻塞,那麼我們需要確保 Reactor 線程被及時的喚醒,從而可以直接進入關閉流程。wakeup == true。

這裏的 addTaskWakesUp 默認為 false 。表示並不是只有 addTask 方法才能喚醒 Reactor 線程 還有其他方法可以喚醒 Reactor 線程,比如 SingleThreadEventExecutor#execute 方法還有本小節介紹的 SingleThreadEventExecutor#shutdownGracefully 方法都會喚醒 Reactor 線程。

關於 addTaskWakesUp 字段的詳細含義和作用,大家可以回顧下 《一文聊透 Netty 核心引擎 Reactor 的運轉架構》 一文中的《1.2.2 Reactor 開始輪詢 IO 就緒事件》小節。


//將正在監聽IO事件的Reactor從Selector上喚醒,表示要關閉了,開始執行關閉流程
if (wakeup) {
//確保Reactor線程在執行完任務之後 不會在selector上停留
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {
//如果此時Reactor正在Selector上阻塞,則可以確保Reactor被及時喚醒
wakeup(inEventLoop);
}
}

  • 通過 taskQueue.offer(WAKEUP_TASK) 向 Reactor 中添加 WAKEUP_TASK,可以確保 Reactor 在執行完異步任務之後不會在 Selector 上做停留,直接執行關閉操作。

  • 如果此時 Reactor 線程正在 Selector 上阻塞,那麼直接調用 wakeup(inEventLoop) 喚醒 Reactor 線程,直接來到關閉流程。

public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup();
}
}
}

6.3 Reactor 線程的優雅關閉

我們先來通過一張 Reactor 優雅關閉整體流程圖來從總體上俯撼一下關閉流程:

Reactor線程優雅關閉流程.png

通過 《一文聊透Netty核心引擎Reactor的運轉架構》 一文的介紹,我們知道 Reacto r是在一個 for 循環中 996 不停地處理 IO 事件以及執行異步任務。如下面筆者提取的 Reactor 運行框架所示:

public final class NioEventLoop extends SingleThreadEventLoop {

@Override
protected void run() {
for (;;) {
try {
.......1.監聽Channel上的IO事件.......
.......2.處理Channel上的IO事件.......
.......3.執行異步任務..........
} finally {
try {
if (isShuttingDown()) {
//關閉Reactor上註冊的所有Channel,停止處理IO事件,觸發unActive以及unRegister事件
closeAll();
//註銷掉所有Channel停止處理IO事件之後,剩下的就需要執行Reactor中剩餘的異步任務了
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}

}

在 Reactor 在每次 for 循環的末尾 finally{....} 語句塊中都會通過 isShuttingDown() 方法去檢查當前 Reactor 的狀態是否是關閉狀態,如果是關閉狀態則開始正式進入 Reactor 的優雅關閉流程。

我們在本文前邊《1.2 優雅關閉》小節中在討論優雅關閉方案的時候提到,我們要着重從以下兩個方面來實施優雅關閉:

  1. 首先需要切走程序承擔的現有流量。

  2. 保證現有剩餘的任務可以執行完畢,保證業務無損。

Netty 這裏實現的優雅關閉同樣也遵從這兩個要點。

  1. 在優雅關閉流程開始之前首先會調用 closeAll() 方法,將 Reactor 上註冊的所有 Channel 全部關閉掉,切掉現有流量。

  2. 隨後會調用 confirmShutdown() 方法,將剩餘的異步任務執行完畢。在該方法中只要有異步任務需要執行,就不能關閉,保證業務無損。該方法返回值為 true 時表示可以進行關閉。返回 false 時表示不能馬上關閉。

6.3.1 切走流量

    private void closeAll() {
//這裏的目的是清理selector中的一些無效key
selectAgain();
//獲取Selector上註冊的所有Channel
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
//獲取NioSocketChannel
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
.........省略......
}
}

for (AbstractNioChannel ch: channels) {
//關閉Reactor上註冊的所有Channel,並在pipeline中觸發unActive事件和unRegister事件
ch.unsafe().close(ch.unsafe().voidPromise());
}
}

首先會通過 selectAgain() 最後一次在 Selector 上執行一次非阻塞輪詢操作,目的是清除 Selector 上的一些無效 Key 。

關於無效 Key 的清除,詳細細節大家可以回看下 《一文聊透Netty核心引擎Reactor的運轉架構》 一文中的《3.1.3 從Selector中移除失效的SelectionKey》小節。

隨後通過 selector.keys() 獲取在 Selector 上註冊的所有 SelectionKey 。進而獲取到 Netty 中的 NioSocketChannel 。SelectionKey 與 NioSocketChannel 的對應關係如下圖所示:

channel與SelectionKey對應關係.png

最後將註冊在 Reactor 上的這些 NioSocketChannel 挨個進行關閉。

Channel 的關閉流程可以回看下筆者的這篇文章 《且看 Netty 如何應對 TCP 連接的正常關閉,異常關閉,半關閉場景》

6.3.2 保證業務無損

該方法中的邏輯是保證 Reactor 進行優雅關閉的核心,Netty 這裏為了保證業務無損,採取的是隻要有異步任務 Task 或者 ShutdwonHooks 需要執行,就不能關閉,需要等待所有 tasks 或者 ShutdownHooks 執行完畢,才會考慮關閉的事情。

    protected boolean confirmShutdown() {
if (!isShuttingDown()) {
return false;
}

if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}

//取消掉所有的定時任務
cancelScheduledTasks();

if (gracefulShutdownStartTime == 0) {
//獲取優雅關閉開始時間,相對時間
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}

//這裏判斷只要有task任務需要執行就不能關閉
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}

/**
* gracefulShutdownQuietPeriod表示在這段時間內,用户還是可以繼續提交異步任務的,Reactor在這段時間內
* 是會保證這些任務被執行到的。
*
* gracefulShutdownQuietPeriod = 0 表示 沒有這段靜默時期,當前Reactor中的任務執行完畢後,無需等待靜默期,執行關閉
* */

if (gracefulShutdownQuietPeriod == 0) {
return true;
}
//避免Reactor在Selector上阻塞,因為此時已經不會再去處理IO事件了,專心處理關閉流程
taskQueue.offer(WAKEUP_TASK);
return false;
}

//此時Reactor中已經沒有任務可執行了,是時候考慮關閉的事情了
final long nanoTime = ScheduledFutureTask.nanoTime();

//當Reactor中所有的任務執行完畢後,判斷是否超過gracefulShutdownTimeout
//如果超過了 則直接關閉
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}

//即使現在沒有任務也還是不能進行關閉,需要等待一個靜默期,在靜默期內如果沒有新的任務提交,才會進行關閉
//如果在靜默期內還有任務繼續提交,那麼靜默期將會重新開始計算,進入一輪新的靜默期檢測
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
taskQueue.offer(WAKEUP_TASK);
try {
//gracefulShutdownQuietPeriod內每隔100ms檢測一下 是否有任務需要執行
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}

return false;
}

// 在整個gracefulShutdownQuietPeriod期間內沒有任務需要執行或者靜默期結束 則無需等待gracefulShutdownTimeout超時,直接關閉
return true;
}

在關閉流程開始之前,Netty 首先會調用 cancelScheduledTasks() 方法將 Reactor 中剩餘需要執行的定時任務全部取消掉。

記錄優雅關閉開始時間 gracefulShutdownStartTime ,這是為了後續判斷優雅關閉流程是否超時。

調用 runAllTasks() 方法將 Reactor 中 TaskQueue 裏剩餘的異步任務全部取出執行。

運行剩餘tasks和hooks.png

調用 runShutdownHooks() 方法將用户註冊在 Reactor 上的 ShutdownHook 取出執行。

我們可以在用户線程中通過如下方式向 Reactor 中註冊 ShutdownHooks :

        NioEventLoop reactor = (NioEventLoop) ctx.channel().eventLoop();
reactor.addShutdownHook(new Runnable() {
@Override
public void run() {
.....關閉邏輯....
}
});

在 Reactor 進行關閉的時候,會取出用户註冊的這些 ShutdownHooks 進行運行。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

//可以向Reactor添加shutdownHook,當Reactor關閉的時候會被調用
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();

private boolean runShutdownHooks() {
boolean ran = false;
while (!shutdownHooks.isEmpty()) {
List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
shutdownHooks.clear();
for (Runnable task: copy) {
try {
//Reactor線程挨個順序同步執行
task.run();
} catch (Throwable t) {
logger.warn("Shutdown hook raised an exception.", t);
} finally {
ran = true;
}
}
}

if (ran) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}

return ran;
}

}

需要注意的是這裏的 ShutdownHooks 是 Netty 提供的一種機制並不是我們在《3. JVM 中的 ShutdownHook》小節中介紹的 JVM 中的 ShutdownHooks 。

JVM 中的 ShutdownHooks 是一個 Thread ,JVM 在關閉之前會 併發無序 地運行。而 Netty 中的 ShutdownHooks 是一個 Runnable ,Reactor 在關閉之前,會由 Reactor 線程 同步有序 地執行。

這裏需要注意的是隻要有 tasks 和 hooks 需要執行 Netty 就會一直執行下去直到這些任務全部執行完為止。

當 Reactor 沒有任何任務需要執行時,這時就會判斷當前關閉流程所用時間是否超過了我們前邊設定的優雅關閉最大超時時間 gracefulShutdownTimeout 。

nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout

如果關閉流程因為前邊這些任務的執行導致已經超時,那麼就直接關閉 Reactor ,退出 Reactor 的工作循環。

如果沒有超時,那麼這時就會觸發前邊介紹的優雅關閉的靜默期 gracefulShutdownQuietPeriod 。

在靜默期中 Reactor 線程會每隔 100ms 檢查一下是否有用户提交任務請求,如果有的話,就需要保證將用户提交的這些任務執行完畢。然後靜默期將會重新開始計算,進入一輪新的靜默期檢測。

如果在整個靜默期內,沒有任何任務提交,則無需等待 gracefulShutdownTimeout 超時,直接關閉 Reactor ,退出 Reactor 的工作循環。

從以上過程我們可以看出 Netty 的優雅關閉至少需要等待一個靜默期的時間。還有一點是 Netty 優雅關閉的時間可能會超出 gracefulShutdownTimeout ,因為 Netty 需要保證遺留剩餘的任務被執行完畢。當所有任務執行完畢之後,才會去檢測是否超時。

6.4 Reactor 的最終關閉流程

當在靜默期內沒有任何任務提交或者關閉流程超時時,上小節中介紹的 confirmShutdown() 就會返回 true 。隨即 Reactor 線程就會退出工作循環。

public final class NioEventLoop extends SingleThreadEventLoop {

@Override
protected void run() {
for (;;) {
try {
.......1.監聽Channel上的IO事件.......
.......2.處理Channel上的IO事件.......
.......3.執行異步任務..........
} finally {
try {
if (isShuttingDown()) {
//關閉Reactor上註冊的所有Channel,停止處理IO事件,觸發unActive以及unRegister事件
closeAll();
//註銷掉所有Channel停止處理IO事件之後,剩下的就需要執行Reactor中剩餘的異步任務了
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}

}

我們在 《詳細圖解 Netty Reactor 啟動全流程》 一文中的《1.3.3 Reactor 線程的啟動》小節中的介紹中提到,Reactor 線程的啟動是通過第一個異步任務被提交到 Reactor 中的時候被觸發的。在向 Reactor 提交任務的方法 SingleThreadEventExecutor#execute(java.lang.Runnable, boolean) 中會觸發下面 doStartThread() 方法的調用,在這裏會調用前邊提到的 Reactor 工作循環 run() 方法。

在 doStartThread() 方法的 finally{...} 語句塊中會完成 Reactor 的最終關閉流程,也就是 Reactor 在退出 run 方法中的 for 循環之後的後續收尾流程。

最終 Reactor 的優雅關閉完整流程如下圖所示:

Reactor優雅關閉全流程.png
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {

..........省略.........

try {
//Reactor線程開始輪詢處理IO事件,執行異步任務
SingleThreadEventExecutor.this.run();
//後面的邏輯為用户調用shutdownGracefully關閉Reactor退出循環 走到這裏
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//走到這裏表示在靜默期內已經沒有用户在向Reactor提交任務了,或者達到優雅關閉超時時間,開始對Reactor進行關閉
//如果當前Reactor不是關閉狀態則將Reactor的狀態設置為ST_SHUTTING_DOWN
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}

try {
for (;;) {
//此時Reactor線程雖然已經退出,而此時Reactor的狀態為shuttingdown,但任務隊列還在
//用户在此時依然可以提交任務,這裏是確保用户在最後的這一刻提交的任務可以得到執行。
if (confirmShutdown()) {
break;
}
}

for (;;) {
// 當Reactor的狀態被更新為SHUTDOWN後,用户提交的任務將會被拒絕
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}

// 這裏Reactor的狀態已經變為SHUTDOWN了,不會在接受用户提交的新任務了
// 但為了防止用户在狀態變為SHUTDOWN之前,也就是Reactor在SHUTTINGDOWN的時候 提交了任務
// 所以此時Reactor中可能還會有任務,需要將剩餘的任務執行完畢
confirmShutdown();
} finally {
try {
//SHUTDOWN狀態下,在將全部的剩餘任務執行完畢後,則將Selector關閉
cleanup();
} finally {
// 清理Reactor線程中的threadLocal緩存,並通知相應future。
FastThreadLocal.removeAll();

//ST_TERMINATED狀態為Reactor真正的終止狀態
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);

//使得awaitTermination方法返回
threadLock.countDown();

//統計一下當前reactor任務隊列中還有多少未執行的任務,打出日誌
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}

/**
* 通知Reactor的terminationFuture成功,在創建Reactor的時候會向其terminationFuture添加Listener
* 在listener中增加terminatedChildren個數,當所有Reactor關閉後 ReactorGroup關閉成功
* */

terminationFuture.setSuccess(null);
}
}
}
}
});
}
}

流程走到 doStartThread 方法中的 finally{...} 語句塊中的時候,這個時候表示在優雅關閉的靜默期內,已經沒有任務繼續向 Reactor 提交了。或者關閉耗時已經超過了設定的優雅關閉最大超時時間。

現在正式來到了 Reactor 的關閉流程。在流程開始之前需要確保當前 Reactor 的狀態為 ST_SHUTTING_DOWN 正在關閉狀態。

注意此刻用户線程依然可以向 Reactor 提交任務。當 Reactor 的狀態變為 ST_SHUTDOWN 或者 ST_TERMINATED 時,用户向 Reactor 提交的任務就會被拒絕,但是此時 Reactor 的狀態為 ST_SHUTTING_DOWN ,依然可以接受用户提交過來的任務。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override
public boolean isShutdown() {
return state >= ST_SHUTDOWN;
}

private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
//當Reactor的狀態為ST_SHUTDOWN時,拒絕用户提交的異步任務,但是在優雅關閉ST_SHUTTING_DOWN狀態時還是可以接受用户提交的任務的
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}

.........省略........
}
}

所以 Reactor 從工作循環 run 方法中退出隨後流程一路走到這裏來的這段時間,用户仍然有可能向 Reactor 提交任務,為了確保關閉流程的優雅,這裏會在 for 循環中不停的執行 confirmShutdown() 方法直到所有的任務全部執行完畢。

隨後會將 Reactor 的狀態改為 ST_SHUTDOWN 狀態,此時用户就不能在向 Reactor 提交任務了。如果此時在提交任務就會收到 RejectedExecutionException 異常。

大家這裏可能會有疑問,Netty 在 Reactor 的狀態變為 ST_SHUTDOWN 之後,又一次調用了 confirmShutdown() 方法,這是為什麼呢?

其實這樣做的目的是為了防止 Reactor 狀態在變為 SHUTDOWN 之前,在這個極限的時間裏,用户又向 Reactor 提交了任務,所以還需要最後一次調用 confirmShutdown() 將在這個極限時間內提交的任務執行完畢。

以上邏輯步驟就是真正優雅關閉的精髓所在,確保任務全部執行完畢,保證業務無損。

在我們優雅處理流程介紹完了之後,下面就是關閉 Reactor 的流程了:

Reactor 會在 SHUTDOWN 狀態下,將 Selector 進行關閉。

    @Override
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn("Failed to close a selector.", e);
}
}

清理 Reactor 線程中遺留的所有 ThreadLocal 緩存。

FastThreadLocal.removeAll();

將 Reactor 的狀態由 SHUTDOWN 改為 ST_TERMINATED 狀態。 此時 Reactor 就算真正的關閉了

 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);

用户線程可能會調用 Reactor 的 awaitTermination 方法阻塞等待 Reactor 的關閉,當 Reactor 關閉之後會調用 threadLock.countDown() 使得用户線程從 awaitTermination 方法返回。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

private final CountDownLatch threadLock = new CountDownLatch(1);

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {

........省略.......

//等待Reactor關閉
threadLock.await(timeout, unit);
return isTerminated();
}

@Override
public boolean isTerminated() {
return state == ST_TERMINATED;
}
}

當這一切處理完畢之後,最後就會設置 Reactor 的 terminationFuture 為 success 。此時註冊在 Reactor 的 terminationFuture 上的 listener 就會被回調。

這裏還記得我們在 《Reactor 在 Netty 中的實現(創建篇)》 一文中介紹的,在 ReactorGroup 中的所有 Reactor 被挨個全部創建成功之後,會向所有 Reactor 的 terminationFuture 註冊一個 terminationListener 。

在 terminationListener 中檢測當前 ReactorGroup 中的所有 Reactor 是否全部完成關閉,如果已經全部關閉,則設置 ReactorGroup 的 terminationFuture 為Success。此刻 ReactorGroup 關閉流程結束,Netty 正式優雅謝幕完畢~~


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

//Reactor線程組中的Reactor集合
private final EventExecutor[] children;
//記錄關閉的Reactor個數,當Reactor全部關閉後,才可以認為關閉成功
private final AtomicInteger terminatedChildren = new AtomicInteger();
//ReactorGroup關閉future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args)
{

........挨個創建Reactor........

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
//當所有Reactor關閉後 才認為是關閉成功
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

........省略........
}

}

到現在為止,Netty 的整個優雅關閉流程,筆者就為大家詳細介紹完了,下圖為整個優雅關閉的完整流程圖,大家可以對照下面這副總體流程圖在回顧下我們前面介紹的源碼邏輯。

Reactor優雅關閉總流程.png

6.5 Reactor 的狀態變更流轉

在本文的最後,筆者再來帶着大家回顧下 Reactor 的狀態變更流程。

Reactor的狀態變更.png
  • 在 Reactor 被創建出來之後狀態為 ST_NOT_STARTED。

  • 隨着第一個異步任務的提交 Reactor 開始啟動隨後狀態為 ST_STARTED 。

  • 當調用 shutdownGracefully 方法之後,Reactor 的狀態變為 ST_SHUTTING_DOWN 。表示正在進行優雅關閉。此時用户仍可向 Reactor 提交異步任務。

  • 當 Reactor 中遺留的任務全部執行完畢之後,Reactor 的狀態變為 ST_SHUTDOWN 。此時如果用户繼續向 Reactor 提交異步任務,會被拒絕,並收到 RejectedExecutionException 異常。

  • 當 Selector 完成關閉,並清理掉 Reactor 線程中所有的 TheadLocal 緩存之後,Reactor 的狀態變為 ST_TERMINATED 。

總結

到這裏關於優雅關閉的前世今生筆者就位大家全部交代完畢了,信息量比較大,需要好好消化一下,很佩服大家能夠一口氣看到這裏。

本文我們從進程優雅啟停方案開始聊起,以優雅關閉的實現方案為起點,先是介紹了優雅關閉的底層基石-內核的信號量機制,從內核又聊到了 JVM 的 ShutdownHook 原理以及執行過程,最後通過三個知名的開源框架為案例,分別從 Spring 的優雅關閉機制聊到了 Dubbo 的優雅關閉,最後通過 Dubbo 的優雅關閉引出了 Netty 優雅關閉的詳細實現方案,前後呼應。

好了,本文的內容就到這裏了,大家辛苦了,相信大家認真看完之後一定會收穫很大,我們下篇文章見~~~