大畫 Spark :: 網路 (7)-Spark 網路中的“四次握手”Executor 註冊到 Driver 過程中的 TransportCli...

語言: CN / TW / HK

回顧

上一篇把 Executor 註冊到 Driver 的過程進行了詳盡的描述。並且把四次往復的過程用圖和程式碼都做了說明,雖然後面的註冊 Executor 的部分沒有詳細再畫圖,但是起過程和第一次確認 Driver 端服務的過程大體相同,如有問題可以給我留言我們來互動溝通。

本章主旨

上述過程中,在 Executor 的 client 端是如何構建了 socket,如何傳送的請求,這部分細節是本章要探討的主要內容。

這部分內容,其實是我第一篇 Spark 的原始碼文章中就講過的,但是當時講的方法有點囉嗦,很多同學看完告訴我還是太硬核了,參考這裡。當時還手工的畫了一個下圖。

所以我考慮再三後,重新把 spark 的內容進行了梳理後,寫的現在的大畫 spark 系列。本次再講解的話,主要還是注重畫圖,減少大幅貼程式碼。

從 Executor 傳送請求的流程

這個過程中,有兩個地方相對優雅一些

  • client 端的懶載入(初始化)

  • 所謂懶載入,即 client 不是在 Executor 端啟動後就直接初始化的,而是在真正的使用過程中進行的初始化,而整個從 Executor→Driver 的傳送訊息的過程中,在 Executor 端也是一個生產者消費者的過程,即訊息會放入一個 queue,由相應的 send 處理來 poll 出然後進行傳送處理。在這個過程中,第一次連線服務端,則會初始化 client,即懶載入,並且,初始化 client 的過程也是一個非同步的過程。會有一個專門的執行緒服務來進行 client 的初始化,當初始化結束後,會持續之前的傳送流程,直到整個傳送結束。

  • client 的 pool 化策略

  • pool 化,即池化的策略。當前的 Executor 對於 Driver 來講是一個 client,在 spark 的檔案儲存管理等角度,它還會作為邏輯上的“client”存在,所以要對於一個 Executor 可能會連線不同的多個 Server,對於每一個 Server 都會用一個連線池來儲存這些 connections,即,對於每一個目標 server 都有存在一組連線池,每次使用的時候從其中隨機拿出一個連線即可。如果這個連線已經被初始化,則直接使用,如果沒有初始化,會通過 Netty 進行一系列的 client 端的初始化,並把初始化後的連線放入連線池。

懶載入與傳送的細節過程

如上圖所示,整個過程可以看作是 1-16 個過程。我舉的例子是第一次連線 Driver 的場合,所以這 16 個步驟都會走到,而連線一次以後,後續因為 client 已經存在,初始化 client 的操作就會被省略。

  1. 從 Executor→Driver 的連線是始於 CoarseGrainedExecutorBackendonStart 方法,這個在上一篇中有介紹

  2. 它呼叫的是 NettyRpcEnvasyncSetupEndpointRefByURI 方法

  3. 進而利用一個 EndpointRef 進行呼叫,所有的 EndpointRef 都是 NettyRpcEndpointRef 的例項,只是傳入的 Endpoint 的 name 不同,通過這個 name 會在目標的 server 去匹配到底哪一個 Endpoint 的例項解析相應的請求

  4. NettyRpcEndpointRef 中會呼叫回 NettyRpcEnvask 方法

  5. 進而呼叫 postToOutbox 的方法,這個方法通過名字也可以看出,其引出了一個新的結構體,就是 OutBox ,所有從 client 端發出去的訊息都是通過這個 OutBox 發出去的

  6. 在 OutBox 中,傳送訊息本身是按照 send 的流程開始的

  7. 從 send 會進入到 drainOutbox 的方法內,這個方法顧名思義,就是要開始對 Outbox 中的訊息進行傳送操作

  8. 在這個方法中會有一個判斷,即判斷 OutBox 中持有的 client 變數是否為 null ,這個 client 變數即 TransportClient 的例項,當第一次進入到這裡的時候,client 一定是 null,所以判斷 is null 一定是 yes,所以會走到第 9 步

  9. 因為沒有 client,所以就要初始化出一個 client,即 launchConnectTask 這個方法的作用。而初始化 client 的過程是一個非同步的過程,即會通過 nettyEnv.*clientConnectionExecutor*.submit 去啟動一個執行緒 task 來去做 client 的初始化,而原來的執行緒的操作會被 return

  10. 通過 call 方法的填充,使得 client 可以被初始化

  11. 這裡通過 call 可以呼叫到 NettyRpcEnvcreateClient 方法,從而可以初始化出一個 client,也就是 TransportClient 的例項

  12. 初始化好的 client 就會放入到 OutBox 中的 client 中

  13. 當初始化好 client 後,這個新建的執行緒 task 還會呼叫回剛才的 drainOutbox 方法,即和步驟 7 呼叫的一致

  14. 這個步驟和步驟 8 也是一致的,但是此時,client 已經被初始化過了,所以判斷是否為 null 走的是 No 的分支

  15. 從而呼叫起 RpcOutboxMessagesendWith 的方法

  16. 進而呼叫起 TransportClientsendRpc 方法,通過這個方法, 可以利用 Netty 的 Channel 把訊息傳送出去,並且儲存發總的 requestId ,方便收到 response 的時候判斷是之前哪一個 reqeust 的訊息,從而回掉髮送訊息時快取的 callback 方法,從而完成一次一來一回的傳送接收過程

通過以上的過程,Executor 就建立起了和 Driver 端的連線,通過這個連線,後續可以繼續通過上述的 4 的 ask 方法來發送 RpcRequestMessage ,如果是傳送的 OneWayMessage ,則使用 send 方法。從英文中的 ask 和 send 也可以分辨出來,ask 是詢問,需要 answer,而 send 只是傳送,不需要回答,所以從方法名也可以看出是否需要 response。具體 RpcRequestMessageOneWayMessage 的區別參考這裡

關於 client

在 spark 中的 client,具有雙重含義。

並不是只在類似於 Executor 的 Client 端存在,而是

  • Executor 的 Client 端存在 TransportClient

  • Driver 的 Server 端也存在 TransportClient

上面要如何理解呢?首先要明確以下幾個邏輯概念

  1. TransportClient ,並不是一個只存在於物理意義上的 client 的 java 的 clas,而是,在物理上 client 與 server 中都存在,目的是握取作業系統底層的對外聯通的 channel ,通過 TransportClient 找到 channel ,進而向外傳送出訊息

  2. 在物理的 client 端啟動的時候, TransportClient 並沒有被初始化,而是在我上一節講解當中描述的,會有一個 lazy 例項化的過程

  3. 在物理的 server 端啟動的時候, TransportClient 的例項化是根據每連上一個物理 client 而動態創建出來的。這裡需要一些 Netty 與底層網路的基礎知識了,我們不去深究,暫時記住這一點即可,可以參考下面的圖來理解

首先,通過這個圖,我們先 High level 的理解一下資料傳輸的過程,以及 channel 的構建過程

  1. Client 端通過 TransportClient 構建出自己的 channel 聯通 server 端

  2. 而 Server 端收到了 client 端的聯通後,會構建出與 client 端通訊的 server 端的 channel,這裡,如果對底層作業系統熟悉的同學應該知道,雙方的 channel 其實就是 linux 底層的 fd

  3. 構建起 channel 後,後續 client 端與 server 端就會通過這個“無形”的 channel 進行資料的互動

知道了以上的知識基礎後,我們會發現,在 client 與 server 都存在 channel,在 client 端 channel 是屬於 TransportClient 的,在 server 端是如何操作的呢?

答案是,在 server 端也是 TransportClient 持有 channel,這裡,spark 做的還是比較優雅,它的 TransportClient 是公共的,無論是 client 還是 server 都有 TransportClient ,其中都有 channel,它的邏輯含義都是通過 TransportClient 所持有的 channel 可以相互通訊,而這個 channel 和 TransportClient 又是如何配合整體的架構體系所存在的呢,繼續畫圖

**TransportClient 與 channel 是如何在大框架中存在的**

  • 無論是在 client 端還是在 server 端,都會有 **TransportClient** 來持有 channel 來給對方傳送訊息,主要注意的是,訊息並不是只有 client 端傳送給 server 端,一旦 connection 聯通後,雙方是可以對等的給對方傳送訊息的,這是網路底層的基礎原理

  • 當訊息到達任何一方後,都是通過 TransportChannelHandler 來進行第一步的分發處理

再借助一個以前的圖來加深印象,在 driver 代表的 Physical server 與 Executor 代表的 Physical client 的構建與結構,可以看到,雙方其實除了在底層網路的 server 層面有差異,其實其餘部分大體相同

Executor 的 Client 端的傳送 &接收資料動作舉例

  • 紅色線條: executor 傳送訊息出來到 driver 通過 executorTransportClient 把訊息放入 channel ,傳送出來

  • **藍色線條:**訊息在 driver 內的流轉和轉換,最終來到處理訊息的 Endpoint

  • 綠色線條:訊息從 driver 傳送回 client ,通過載 driver 中掌握的 executorEndpointRefTransportClient 把訊息放入 channel ,從而發給 executor

  • 這個過程中,我在最初看原始碼的時候有一個疑問, driver 到底是如何獲取到的 executorEndpointRef 的,或者說,最終是通過 TransportClient 傳送的 response 回去,這個 TransportClientchannel 是如何與 EndpointRef 整合到一起的呢?下一篇會詳盡描述

Driver 的 Server 端的傳送動作舉例

  • 藍色線條:訊息從 DriverEndpoint 主動發出呼叫了持有的 executorEndpointRef

  • 綠色線條: EndpointRefTransportClient 把訊息放入 channel ,從而發給 executorexecutor 接到後呼叫相應的 callback 處理

總結

本篇其實和上一篇差不多,是上一篇的概括版,把 TransportClient 的邏輯細節又做了詳盡的闡述。很多開發同學都會把自己陷入到網路就是 http 協議的怪圈中去,其實底層的網路 client 和 server 一旦聯通之後,雙方的邏輯是相同的,你可以把它比做是 websocket 協議,client→server 與 server→client 是對等的。基於此,在 spark 中,executor(client 端)主動向 driver(server 端)傳送註冊申請,註冊之後,driver 獲取了 executor 的連線(TransportClient 與 channel),才可能通過 driver(server 端)來下發任務給 executor(client 端),這塊的細節,下一篇繼續硬核攻擊。