Kafka能抗住百萬併發的祕密...

語言: CN / TW / HK

閱讀本文大約需要 30 分鐘。

大家好,我是 蘇三 , 又跟大家見面了。

上一篇作為專題系列的第一篇,我們深度剖析了關於 Kafka 儲存架構設計的實現細節,今天開啟第二篇,我們來深度剖析下「 Kafka Broker 端網路架構和請求處理流程 」是如何設計的?  相信使用過 Kafka 的朋友都知道其吞吐量可以高達百萬,但很少人理解其中的設計原理。

那麼 Kafka Broker 端網路架構和請求處理到底是使用了哪些高大上的技術?它到底解決了什麼問題?究竟是怎麼解決的?

只有瞭解了這些, 我們才能深刻掌握 Kafka 服務端設計精髓所在,更加深刻理解一個高併發、高效能服務端架構該如何設計。

認真讀完這篇文章,我相信你會對Kafka Broker請求處理流程和網路架構設計實現細節,有更加深刻的理解。

這篇文章乾貨很多,希望你可以耐心讀完。

01 總體概述

要想理解 Kafka Broker 請求處理架構設計,我們需要從簡單請求處理模型來說起。

對於日常系統開發,我們都知道是基於 Request/Response 的模式來實現的, 對於 Kafka 來說, 無論是 Producer 端、Consumer 端 還是 Broker 端,他們之間的請求互動也都是基於「 Request/Response 」模式來完成的。比如,客戶端會通過網路傳送訊息生產請求給 Broker,而 Broker 處理完成後,會發送對應的響應給到客戶端。

下面,我會從自我設計角度出發,如果是我們會如何設計,帶你一步步演化出來「 kafka Broker 的網路請求處理 」架構。

在這個過程中,你會看到 Kafka 在處理請求的過程中會遇到哪些高效能和高併發問題,以及架構為什麼要這樣演進,從而理解 Kafka 這麼設計的意義和精妙之處。

02 順序處理模式

我們從最簡單的網路程式設計思路處理方式講起。

因為對於 Kafka Broker 來說就是用來接收生產者傳送過來的請求,那這個時候最簡單的實現大概是這樣的:

如上述程式碼所示:我們可以理解 Kafka 每個伺服器啟動起來後就是一個 while 迴圈, 不斷的 accept 生產者提交上來的請求, 然後進行處理並存儲到磁碟上,這種方式實現最簡單,也非常好理解,但是這種方式存在2個致命的缺陷?

  1. 請求阻塞:只能順序處理每個請求,即每個請求都必須等待前一個請求處理完畢才能得到處理。

  2. 吞吐量非常差:由於只能順序處理,無法併發,效率太低,所以吞吐量非常差,只適合請求傳送非常不頻繁的系統。

從上面來看很明顯,如果你的 Kafka 系統請求併發量很大,意味著要處理的時間就會越久。那按照前面我們提到的 Kafka「 吞吐量 」的標準,這個方案遠遠無法滿足我們對高效能、高併發的要求。

那有什麼更好的方案可以快速處理請求嗎?

接下來我們可以試著採取這個方案: 獨立執行緒非同步處理模式

03 多執行緒非同步處理模式

既然同步方式會阻塞請求,吞吐量差, 我們可以嘗試著使用獨立執行緒非同步方式進行處理, 即經典的 connection per thread 模型, 那這個時候的實現大概是這樣的:

如上述程式碼所示:同上還是一個 while 迴圈不斷的 accept 生產者提交上來的請求,但是這時候 Kafka 系統會為每個請求都建立一個「 單獨的執行緒 」來處理。

這個實現方案的好處就是:

  1. 吞吐量稍強:相對上面同步方式的方案,一定程度上極大地提高了伺服器的吞吐量。

  2. 非阻塞:它是完全非同步的,每個請求的處理都不會阻塞下一個請求。

但同樣缺陷也同樣很明顯: 即為每個請求都建立執行緒的做法開銷很大 ,在某些高併發場景下會壓垮整個服務。可見,這個方案也只適用於請求傳送頻率很低的業務場景。還是無法滿足我們對高效能、高併發的要求。

既然這種方案還是不能滿足, 那麼我們究竟該使用什麼方案來支撐高併發呢?

這個時候我們可以想想我們日常開發用到的7層負載Nginx或者Redis在處理高併發請求的時候是使用什麼方案呢?

從上面啟發你可以看出,提升系統 I/O 併發效能的關鍵思路就是:「 事件驅動 」。

想必大家已經猜到了,沒錯,就是「 多路複用 」。那麼Kafka 是不是也是採用這種方案來實現呢?

這裡我們先考慮採用基於「 事件驅動 」的設計方案,當有事件觸發時,才會呼叫處理器進行資料處理。

04 Reactor 模式

在高效能網路程式設計領域,有一個非常著名的模式——Reactor模式。那麼何為「 Reactor模式 」,首先它是基於事件驅動的,有一個或多個併發輸入源,有一個Service Handler,有多個Request Handler;這個Service Handler會同步的將輸入的請求輪詢地分發給相應的Request Handler進行處理。

藉助於 Doug Lea(就是那位讓人無限景仰的大爺)的 "Scalable IO in Java" 中講述的Reactor模式。

"Scalable IO in Java" 的地址是:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

簡單來說, Reactor 模式特別適合應用於處理多個客戶端併發向伺服器端傳送請求的場景 。這裡借用大神 PDF 中的一幅圖來說明 Reactor 架構:

從上面這張圖中,我們可以看出多個客戶端會發送請求給到 Reactor。Reactor 有個請求分發執行緒 Dispatcher,也就是圖中的綠色的 Acceptor,它會將不同的請求下分發到多個工作執行緒中處理。

在這個架構中,Acceptor 執行緒只是用來進行請求分發,所以非常輕量級,因此會有很高的吞吐量。而這些工作執行緒可以根據實際系統負載情況動態調節系統負載能力,從而達到請求處理的平衡性。

基於上面的 Reactor 架構, 我們來看看如果是我們該如何設計 Kafka 服務端的架構?

  1. 這裡我們採用多路複用方案,Reactor 設計模式,並引用 Java NIO 的方式可以更好的解決上面併發請求問題。

  2. 當 Client 端將請求傳送到 Server 端的時候, 首先在 Server 端有個多路複用器( Selector ),然後會啟動一個 Accepter 執行緒將 OP_CONNECT 事件註冊到多路複用器上, 主要用來監聽連線事件到來。

  3. 當監聽到連線事件後,就會在多路複用器上註冊 OP_READ 事件, 這樣 Cient 端傳送過來的請求, 都會被接收到。如果請求特別多的話, 我們這裡進行優化, 建立一個 Read HandlePool 執行緒池

  4. 當 Read HandlePool 執行緒池接收到請求資料後,最終會交給 Handler ThreadPool 執行緒池進行後續處理。比如如果是生產者傳送過來的請求,肯定會解析請求體,處理並最終儲存到磁碟中,待處理完後要返回處理結果狀態, 這時候就由它在多路複用器上註冊 OP_WRITE 事件來完成。這樣多路複用器遍歷到 OP_WRITE 事件後就會將請求返回到 Client 端。

  5. 在上圖中我們看到在整個流程中還有一個 MessageQueue 的佇列元件存在, 為什麼要加這個元件呢?我們可以想象一下, 如果請求量非常大,直接交給 Handler ThreadPool 執行緒池進行處理, 可能會出現該執行緒池處理不過來的情況發生,如果處理不過來,也會出現阻塞瓶頸。所以這裡我們在 Server 端內部也設計一個訊息佇列, 起到一個緩衝的作用,Handler ThreadPool 執行緒池會根據自己的負載能力進行處理。

以上就是我們引用了「 多路複用 」的設計方案,但是 Kafka Broker 端就是這樣的架構設計方案嗎?如果我們是 Kafka 系統架構的設計者,採用這樣的架構設計方案會不會還是有什麼問題,有沒有哪個環節會出現系統性能瓶頸呢?

這是個值得思考的問題, 很考驗你的架構設計能力。

細心的讀者可能會發現:對於 Kafka 這種超高併發系統來說,一個 Selector 多路複用器是 Hold 不住的,從上圖可以得出,我們監聽這些連線、接收請求、處理響應結果都是同一個 Selector 在進行處理,很容易成為系統性能瓶頸。

接下來,我們將進一步進行優化,為了減輕當前 Selector 的處理負擔,引入另外一個Selector 處理佇列,如下圖所示:

  1. 首先上圖是目前我認為最接近 Kafka Broker 真實架構設計方案的。

  2. 整體架構跟上一版的類似,只不過這裡多引入了一個多 Selector 處理佇列,原來的 Selector 只負責監聽連線, 這時候有讀者就會有疑問,請求量超級大的時候,一個 Selector 會不會成為瓶頸呢?這裡可以大可放心, 這時候它的工作非常單一,是完全能 hold 住的。

  3. 那麼對於我們接收請求、處理請求、返回狀態操作都會交由多 Selector 處理佇列,至於這裡到底需要多少個 Selector,又會跟什麼引數和配置有關係,我們後續再進行分析,總之這裡記住有多個 Selector 就行了,這樣系統壓力就會被分散處理。

  4. 另外我們要搞清楚的一點就是對於 Kafka 服務端指的是每個 Broker 節點,如果我們的服務叢集總共有10個節點, 每個節點內部都是上面的這樣的架構,這樣我們就有理由相信如果採用這樣的架構設計方案,是可以支援高併發和高效能的。

架構設計方案演進到這裡,基本上已經差不多了, 接下來我們看看 Kafka 真實超高併發的網路架構是如何設計的

05 Kafka 超高併發網路架構

在上面 Kafka 高效能、高吞吐量架構演進的時候,我們提到了 Java NIO 以及 Reactor 設計模式。實際上,搞透了「 Kafka 究竟是怎麼使用 NIO 來實現網路通訊的 」,不僅能讓我們掌握 Kafka 請求處理全流程處理,也能讓我們對 Reactor 設計模式有更深的理解,還能幫助我們解決很多實際問題。

那麼接下來我們就來深入剖析下 Kafka 的 NIO 通訊機制吧。

我們先從整體上看一下完整的網路通訊層架構,如下圖所示:

  1. 從上圖中我們可以看出,Kafka 網路通訊架構中用到的元件主要由兩大部分構成: SocketServerRequestHandlerPool

  2. SocketServer 元件是 Kafka 超高併發網路通訊層中最重要的子模組。它包含 Acceptor 執行緒、Processor 執行緒和 RequestChannel 等物件,都是網路通訊的重要組成部分。它主要實現了 Reactor 設計模式,主要用來處理外部多個 Clients(這裡的 Clients 可能包含 Producer、Consumer 或其他 Broker)的併發請求,並負責將處理結果封裝進 Response 中,返還給 Clients。

  3. RequestHandlerPool 元件就是我們常說的 I/O 工作執行緒池,裡面定義了若干個 I/O 執行緒, 主要用來執行真實的請求處理邏輯

  4. 這裡注意的是:跟 RequestHandler 相比, 上面所說的Acceptor、Processor 執行緒 還有 RequestChannel 等都不做請求處理, 它們只是請求和響應的「搬運工」

接下來我們來具體聊聊SocketServer中的實現原理,這裡先來講講:

  1. Acceptor 執行緒

  2. Processor 執行緒

以Kafka 2.5版本,原始碼位置:

https://github.com/apache/kafka/blob/2.5.0-rc3/core/src/main/scala/kafka/network/SocketServer.scala

05.1 聊聊 Acceptor 執行緒

在經典的 Reactor 設計模式有個 「Dispatcher」 的角色,主要用來接收外部請求並分發給下面的實際處理執行緒。通過上面分析我們知道在 Kafka 網路架構設計中,這個 Dispatcher 就是「 Acceptor 執行緒 」。

Acceptor 執行緒是用來接收和建立外部 TCP 連線的執行緒。在Broker 端每個 SocketServer 例項只會建立一個 Acceptor 執行緒。它的主要功能就是建立連線,並將接收到的 Request 請求傳遞給下游的 Processor 執行緒處理。

/**
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/

private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup
{
// 1. 建立底層的NIO Selector物件,用來監聽連線建立請求、讀寫請求等
private val nioSelector = NSelector.open()
// 2. Broker端建立對應的ServerSocketChannel例項,然後將Channel註冊到Selector物件上
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 3. 建立Processor執行緒池
private val processors = new ArrayBuffer[Processor]()
......
/**
* Accept loop that checks for new connection attempts
*/

def run(): Unit = {
//註冊OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
// 等待Acceptor執行緒啟動完成
startupComplete()
try {
// 當前使用的Processor序號,從0開始
var currentProcessorIndex = 0
while (isRunning) {
try {
// 每500毫秒獲取一次就緒I/O事件
val ready = nioSelector.select(500)
// 如果有I/O事件準備就緒
if (ready > 0) {
........
// 呼叫accept方法建立Socket連線
accept(key).foreach { socketChannel =>
........
// 指定由哪個Processor執行緒進行處理
processor = synchronized {
.........
processors(currentProcessorIndex)
}
// 更新Processor執行緒序號
currentProcessorIndex += 1
}
.........
}
}

這裡重點看下 Acceptor 執行緒中三個非常關鍵且重要的屬性和方法:

  1. nioSelector:它就是我們所熟悉的 Java NIO 庫中的 Selector 物件例項,所有網路通訊元件實現 Java NIO 機制的基礎。

  2. processors:通過原始碼我們可以知道在Acceptor 執行緒在初始化時,需要建立對應的 Processor 執行緒池。由此可以得出,Processor 執行緒是在 Acceptor 執行緒中管理和維護的。

  3. run方法:它是處理 Reactor 模式中分發邏輯的主要實現方法。

  1. 從上述原始碼中,我們可以看出 Acceptor 執行緒主要使用了 Java NIO 的 Selector 以及 SocketChannel 的方式迴圈的輪詢準備就緒的 I/O 事件。

  2. 這裡的 I/O 事件主要是指網路連線建立事件即: SelectionKey.OP_ACCEPT

  3. 這樣註冊好事件後,一旦後續接收到連線請求後,Acceptor 執行緒就會指定一個 Processor 執行緒,並將該請求交給它並建立網路連線用於後續處理。

05.2 聊聊 Processor 執行緒

從上面分析我們知道 Acceptor 只是做了請求入口連線處理的,那麼,真正建立網路連線以及分發網路請求是由 Processor 執行緒來完成的。

override def run(): Unit = {
// 等待Processor執行緒啟動完成
startupComplete()
try {
while (isRunning) {
try {
// 建立新連線
configureNewConnections()
// 傳送Response
processNewResponses()
// 執行NIO poll,獲取對應SocketChannel上準備就緒的I/O操作
poll()
// 將接收到的Request放入Request佇列
processCompletedReceives()
.......
} catch {
.........
}
}
} finally {
........
}
}
........
// 預設連線對接大小
val ConnectionQueueSize = 20
// 儲存要建立的新連線資訊
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
// 一個臨時 Response 佇列
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
// Response 佇列
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

從上面 Processor 執行緒原始碼,可以看出 Kafka 的程式碼邏輯實現的非常好,各個子方法的邊界非常清楚。

這裡我們就不展開原始碼分析了, 更深入詳細的等到原始碼分析專題再進行。我們簡單的看下 Processor 執行緒初始化時要做的事情。

看上面程式碼最後部分,我們知道每個 Processor 執行緒在建立時都會建立 3 個佇列。

  1. newConnections 佇列:它主要是用來儲存要建立的新連線資訊,也就是SocketChannel 物件,目前是硬編碼佇列長度大小為20。每當 Processor 執行緒接收到新的連線請求時,都會將對應的 SocketChannel 物件放入佇列,等到後面建立連線時,從該佇列中獲取 SocketChannel,然後註冊新的連線。

  2. inflightResponse 佇列:它是一個臨時的 Response 佇列, 當 Processor 執行緒將 Repsonse 返回給 Client 之後,要將 Response 放入該佇列。它存在的意義:由於有些 Response 回撥邏輯要在 Response 被髮送回 Request 傳送方後,才能執行,因此需要暫存到臨時佇列。

  3. ResponseQueue 佇列:它主要是存放需要返回給Request 傳送方的所有 Response 物件。通過原始碼得知:每個 Processor 執行緒都會維護自己的 Response 佇列。

06 請求處理核心流程剖析

上面深入的剖析了 Kafka 超高併發網路架構 以及 SocketServer 中的 Acceptor 執行緒跟 Processor 執行緒的實現原理, 接下來我們來將請求處理核心流程給串起來。

只有搞透這部分的實現原理,才能幫助我們有針對性的進行 Broker端請求處理的效能調優。

比如:在上面網路架構圖,有兩個引數跟整個流程有關係,分別是 num.network.threads、num.io.threads 。如果我們不掌握請求處理的整個流程,就不能更好的對此進行調整,來達到更高的效能要求。

其中 num.io.threads 就是 I/O 工作執行緒池的大小配置,即 KafkaRequestHandlerPool 執行緒池,它才是「 真正處理 Kafka 請求 」的地方。

以Kafka 2.5版本,原始碼位置:

https://github.com/apache/kafka/blob/2.5.0-rc3/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
/**
* A thread that answers kafka requests.
*/

class KafkaRequestHandler(id: Int, //I/O執行緒序號
brokerId: Int, //所在Broker序號,即broker.id
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger, //I/O執行緒池大小
val requestChannel: RequestChannel, //請求處理通道
apis: KafkaApis, //KafkaApis類,用於真正實現請求處理邏輯的類
time: Time) extends Runnable with Logging
{
......
def run(): Unit = {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 從請求佇列中獲取下一個待處理的請求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
// 統計執行緒空閒時間
val idleTime = endTime - startSelectTime
// 更新執行緒空閒百分比指標
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

req match {
// 當關閉執行緒請求處理
case RequestChannel.ShutdownRequest =>
......
// 當普通請求到來時
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
// 由KafkaApis.handle方法執行相應處理邏輯
apis.handle(request)
} catch {
....
} finally {
// 釋放請求物件資源
request.releaseBuffer()
}
case null => // continue
}
}
shutdownComplete.countDown()
}
}

下面我們結合 Kafka 超高併發網路架構圖來講解下一個完整請求處理核心流程:

  1. Clients 傳送請求給 Acceptor 執行緒。

  2. Acceptor 執行緒會建立 NIO Selector 物件,並建立 ServerSocketChannel 例項,然後將 ChannelOP_ACCEPT 事件繫結到 Selector 多路複用器上。

  3. Acceptor 執行緒還會預設建立3個大小的 Processor 執行緒池,引數: num.network.threads , 並輪詢的將請求物件 SocketChannel 放入到連線佇列中(newConnections)。

  4. 這時候連線佇列就源源不斷有請求資料了,然後不停地執行 NIO Poll , 獲取對應 SocketChannel 上已經準備就緒的 I/O 事件。

  5. Processor 執行緒向 SocketChannel 註冊了 OP_READ/OP_WRITE 事件,這樣 客戶端發過來的請求就會被該 SocketChannel 物件獲取到,具體就是CompleteReceives。

  6. 這個時候客戶端就可以源源不斷進行請求傳送了,服務端通過 Selector NIO Poll 不停的獲取準備就緒的 I/O 事件。

  7. 然後根據Channel中獲取已經完成的 Receive 物件,構建 Request 物件,並將其存入到 RequestchannelRequestQueue 請求佇列中 。

  8. 這個時候就該 I/O 執行緒池上場了,KafkaRequestHandler 執行緒迴圈地從請求佇列中獲取 Request 例項,然後交由 KafkaApishandle 方法,執行真正的請求處理邏輯,並最終將資料儲存到磁碟中。

  9. 待處理完請求後, KafkaRequestHandler 執行緒會將 Response 物件放入 Processor 執行緒的 Response 佇列。

  10. 然後 Processor 執行緒通過 Request 中的 ProcessorID 不停地從 Response 佇列中來定位並取出 Response 物件,返還給 Request 傳送方。

至此,我們深入剖析完畢 Kafka 網路架構請求「 核心流程 」。

07 系統調優

搞透了 Kafka 超高併發網路架構設計和請求處理核心流程後,我們來聊聊 Broker 端引數調優。

對 Kafka 而言,效能一般是指吞吐量和延時。所以高吞吐量、低延時是我們調優 Kafka 叢集的主要目標。

Broker 端調優主要就是合理地設定 Broker 端引數值,以匹配你的生產環境。另外還有一點要說明的就是「 保證伺服器端和客戶端版本的一致 」,做到這一點,就能獲得很多效能收益了。

num.network.threads

建立 Processor 處理網路請求執行緒個數,建議設定為 Broker 當前CPU核心數*2,這個值太低經常出現網路空閒太低而缺失副本。

num.io.threads

建立 KafkaRequestHandler 處理具體請求執行緒個數,建議設定為Broker磁碟個數*2。

num.replica.fetchers

建議設定為CPU核心數/4,適當提高可以提升CPU利用率及 Follower同步 Leader 資料當並行度。

compression.type

建議採用lz4壓縮型別,壓縮可以提升CPU利用率同時可以減少網路傳輸資料量。

queued.max.requests

在網路執行緒停止讀取新請求之前,可以排隊等待I/O執行緒處理的最大請求個數,生產環境建議配置最少500以上,預設500。

log.flush.xxx

log.flush.scheduler.interval.ms

log.flush.interval.ms

log.flush.interval.messages

這幾個引數表示日誌資料重新整理到磁碟的策略,應該保持預設配置,刷盤策略讓作業系統去完成,由作業系統來決定什麼時候把資料刷盤;如果設定來這個引數,可能對吞吐量影響非常大。

auto.leader.rebalance.enable

表示是否開啟leader自動負載均衡,預設true;我們應該把這個引數設定為false,因為自動負載均衡不可控,可能影響叢集效能和穩定。

08 總結

這裡,我們一起來總結一下這篇文章的重點。

1、對於 Kafka 這樣一個優秀的服務端系統架構來說,應該遵循高可用、高效能、高併發 3 大原則。

2、本文從最簡單的網路程式設計思路出發一步一步演進到 Reactor 設計模式,假設我們就是 Kafka 架構的設計者,我們該如何設計其服務端網路架構。

3、通過本文的深度剖析,提升系統I/O效能的核心是基於「事件驅動」模型實現。

4、在剖析完服務端網路架構後,我們也深度剖析了 SocketServer中兩個最重要的執行緒:Acceptor 執行緒和 Processor 執行緒。

5、接著我們結合 Kafka 超高併發網路架構圖又梳理了 Kafka 請求處理核心流程。

6、最後給大家分析並做了 Broker 端系統調優的方案。

如果我的文章對你有所幫助,還請幫忙點贊、在看、轉發一下,非常感謝!