全鏈路非同步Rest客戶端 ESA RestClient

語言: CN / TW / HK

ESA Stack(Elastic Service Architecture) 是OPPO雲端計算中心孵化的技術品牌,致力於微服務相關技術棧,幫助使用者快速構建高效能,高可用的雲原生微服務。產品包含高效能Web服務框架、RPC框架、服務治理框架、註冊中心、配置中心、呼叫鏈追蹤系統,Service Mesh、Serverless等各類產品及研究方向。

當前部分產品已經對外開源:

開源主站:

https://www.esastack.io

Github:

https://github.com/esastack

RestClient 專案地址:

https://github.com/esastack/esa-restclient

RestClient 文件地址:

https://www.esastack.io/esa-restclient

ESA RestClient

ESA RestClient 是一個基於 Netty 的全鏈路非同步事件驅動的高效能輕量級的HTTP客戶端。

以下簡稱RestClient

1、Quick Start

Step1:新增依賴

<dependency>

<groupId>io.esastack</groupId>

<artifactId>restclient</artifactId>

<version>1.0.0</version>

</dependency>

Step2: 構建RestClient併發送請求處理響應

final RestClient client = RestClient.ofDefault(); //快速建立RestClient,各項配置均為預設配置。

//如果使用者想自定義一些配置,則可以使用RestClient.create()來進行自定義配置。

client.post("http://127.0.0.1:8081/")

.entity("Hello Server") //設定請求體

.execute() //執行請求邏輯

.thenAccept((response)-> { //非同步處理響應

try {

System.out.println(response.bodyToEntity(String.class)); //呼叫response.bodyToEntity(Class TargetClass)來 Decode 響應,

//TargetClass為期望的響應型別

} catch (Exception e) {

e.printStackTrace();

}

});

2、功能特性

· Http1/H2/H2cUpgrade/Https

· Encode 與 EncodeAdvice

· Decode 與 DecodeAdvice

· RestInterceptor

· 大檔案傳送

· 請求級別讀超時

·請求級別重試

· 請求級別重定向

· 100-expect-continue

· Multipart

· Metrics

· more …

2.1 Encode 與 EncodeAdvice

2.1.1 Encode

RestClient會自動根據使用者的 Headers 與 Entity 等選擇合適的Encoder進行Encode。其內建了下面這些Encoder:

·Json

jackson(預設)

fastjson

gson

·ProtoBuf

·File

·String

·byte[]

除此之外RestClient也支援使用者自定義Encoder。

2.1.1.1 使用Json Encoder

指定contentType為

MediaType.APPLICATION_JSON,將自動使用Json Encoder來對Entity來進行Encode。示例如下:

final RestClient client = RestCient.ofDefault();

client.post("localhost:8080/path")

.contentType(MediaTpe.APPLICATION_JSON)

.entity(new Person("Bob""male"))

.execute();

2.1.1.2 使用ProtoBuf Encoder

指定contentType為ProtoBufCodec.PROTO_BUF,且Entity型別為com.google.protobuf.Message的子類時,將自動使用ProtoBuf Encoder來對Entity來進行Encode。示例如下:

final RestClient client = RestClient.ofDefault();

client.post("localhost:8080/path")

.contentType(ProtoBufCodec.PROTO_BUF)

.entity(message)

.execute();

2.1.1.3 使用File Encoder

當Entity型別為File時,將自動使用File Encoder來對Entity來進行Encode。示例如下:

final RestClient client = RestClient.ofDefault();

client.post("localhost:8080/path")

.entity(new File("tem"))

.execute();

2.1.1.4 自定義Encoder

當RestClient內建的Encoder無法滿足使用者需求時,使用者可以自定義Encoder,示例如下:

public class StringEncoder implements ByteEncoder {



@Override

public RequestContent<byte[]> doEncode(EncodeContext<byte[]> ctx) {

if (MediaType.TEXT_PLAIN.equals(ctx.contentType())) {

if (ctx.entity() != null) {

return RequestContent.of(((String) ctx.entity()).getBytes(StandardCharsets.UTF_8));

} else {

return RequestContent.of("null");

}

}

//該Encoder無法Encode這種型別,將Encode工作交給下一個Encoder

return ctx.next();

}

}

使用者可以將自定義的Encoder直接繫結到請求或者Client上,同時也支援使用者通過SPI的方式載入Encoder,具體參見文件:《RestClient 配置Encoder》

2.1.1.5 Encode執行時機

見請求處理完整流程中的Encoder。

2.1.2 EncodeAdvice

使用者可以通過EncodeAdvice在Encode前後插入業務邏輯,來對要Encode的 Entity 或者 Encode後的RequestContent 進行修改替換等操作。

2.1.2.1 示例

public class EncodeAdviceImpl implements EncodeAdvice {

@Override

public RequestContent<?> aroundEncode(EncodeAdviceContext ctx) throws Exception {

//...before encode

RequestContent<?> requestContent = ctx.next();

//...after encode

return requestContent;

}

}

使用者可以將自定義的EncodeAdvice直接繫結到Client上,同時也支援使用者通過SPI的方式載入EncodeAdvice,具體參見文件:《RestClient 配置EncodeAdvice》

2.1.2.2 執行時機

見請求處理完整流程中的EncodeAdvice。

2.2 Decode 與 DecodeAdvice

2.2.1 Decode

RestClient會自動根據使用者的 Headers 與 期望Entity型別 等選擇合適的Decoder進行Decode。RestClient內建了下面這些Decoder:

·Json

jackson(預設)

fastjson

gson

·ProtoBuf

·String

·byte[]

除此之外RestClient也支援使用者自定義解碼器。

2.2.1.1 使用Json Decoder

當Response的contentType為

MediaType.APPLICATION_JSON,將自動使用Json Decoder來來進行Decode。

final RestClient client = RestClient.ofDefault();

client.get("localhost:8080/path")

.execute()

.thenAccept((response)-> {

try {

//當 MediaType.APPLICATION_JSON.equals(response.contentType()) 時將自動使用Json Decoder

System.out.println(response.bodyToEntity(Person.class));

} catch (Exception e) {

e.printStackTrace();

}

});

2.2.1.2 使用ProtoBuf Decoder

當Response的contentType為

ProtoBufCodec.PROTO_BUF,

且response.bodyToEntity()傳入的型別為com.google.protobuf.Message的子類時,將自動使用ProtoBuf Decoder來進行Decode。

final RestClient client = RestClient.ofDefault();

client.get("localhost:8080/path")

.execute()

.thenAccept((response)-> {

try {

//當 ProtoBufCodec.PROTO_BUF.equals(response.contentType()),且 Person 為 Message 的子類時,將自動使用ProtoBuf Decoder

System.out.println(response.bodyToEntity(Person.class));

} catch (Exception e) {

e.printStackTrace();

}

});

2.2.1.3 自定義Decoder

當RestClient內建的Decoder無法滿足使用者需求時,使用者可以自定義Decoder,示例如下:

public class StringDecoder implements ByteDecoder {



@Override

public Object doDecode(DecodeContext<byte[]> ctx) {

if (String.class.isAssignableFrom(ctx.targetType())) {

return new String(ctx.content().value());

}

return ctx.next();

}

}

使用者可以將自定義的Decoder直接繫結到請求或者Client上,同時也支援使用者通過SPI的方式載入Decoder,具體參見文件:《RestClient 配置Decoder》

2.2.1.4 執行時機

見請求處理完整流程中的Decoder。

2.2.2 DecodeAdvice

使用者可以通過DecodeAdvice在Decode前後進行來插入業務邏輯,來對要解碼的 ResponseContent 或者 Decode後的物件 進行修改替換等操作。

2.2.2.1 示例

public class DecodeAdviceImpl implements DecodeAdvice {

@Override

public Object aroundDecode(DecodeAdviceContext ctx) throws Exception {

//...before decode

Object decoded = ctx.next();

//...after decode

return decoded;

}

}

使用者可以將自定義的DecodeAdvice直接繫結到Client上,同時也支援使用者通過SPI的方式載入DecodeAdvice,具體參見文件:《RestClient 配置DecodeAdvice》

2. 2.2.2 執行時機

見請求處理完整流程中的DecodeAdvice。

2.3 RestInterceptor

使用者可以使用RestInterceptor在請求傳送前和響應接收後來插入業務邏輯。RestClient支援通過builder配置和SPI載入兩種方式配置RestInterceptor。

2.3.1 Builder配置

在構造RestClient時傳入自定義的RestInterceptor例項,如:

final RestClient client = RestClient.create()

.addInterceptor((request, next) -> {

System.out.println("Interceptor");

return next.proceed(request);

}).build();

2.3.2 SPI

2.3.2.1 普通SPI

RestClient支援通過SPI的方式載入RestInterceptor介面的實現類,使用時只需要按照SPI的載入規則將自定義的RestInterceptor放入指定的目錄下即可。

2.3.2.2 RestInterceptorFactory

如果使用者自定義的RestInterceptor對於不同RestClient的配置有不同的實現,則使用者可以實現RestInterceptorFactory介面,並按照SPI的載入規則將自定義的RestInterceptorFactory放入指定的目錄下即可。

public interface RestInterceptorFactory {

Collection<RestInterceptor> interceptors(RestClientOptions clientOptions);

}

在RestClient構建時將呼叫

RestInterceptorFactory.interceptors(RestClientOptions clientOptions),該方法返回的所有RestInterceptor都將加入到構建好的RestClient中。

2.3.2.3 執行時機

見請求處理完整流程中的RestInterceptor。

2.4 大檔案傳送

當檔案較小時,可通過直接將檔案內容寫入請求body來發送檔案。但是當檔案內容過大時,直接寫入會有OOM風險。

為了解決這個問題,RestClient藉助底層的Netty使用NIO以零拷貝的方式傳送檔案,避免了OOM的同時又減少了資料的多次拷貝。

使用者只需要簡單的介面呼叫便可使用該功能:

final RestClient client = RestClient.ofDefault();

final String entity = client.post("http://127.0.0.1:8081/")

.entity(new File("bigFile"))

.execute();



2.5 讀超時

RestClient支援請求級別的讀超時,同時也支援Client 級別的讀超時。預設讀超時為6000L。

2.5.1 Client級別讀超時

Client級別的讀超時將對該Client下的所有請求生效,具體配置方式如下:

final RestClient client = RestClient.create()

.readTimeout(3000L)

.build();

2.5.2 Request級別讀超時

當Request設定了讀超時,其資料將覆蓋Client設定的讀超時,具體配置方式如下:

final RestClient client = RestClient.ofDefault();



client.get("http://127.0.0.1:8081/")

.readTimeout(3000L)

.execute()

.thenAccept((response)-> {

try {

System.out.println(response.bodyToEntity(String.class));

} catch (Exception e) {

e.printStackTrace();

}

});

2.6 重試

RestClient支援請求級別的重試,同時也支援Client 級別的重試。

預設情況下,RestClient僅會對所有丟擲連線異常的請求進行重試(防止服務端的服務為非冪等),其中:最大重試次數為3(不包括原始請求),重試間隔時間為0。使用時,可以通過自定義RetryOptions引數更改重試次數、重試條件、重試間隔時間等。

2.6.1 Client級別重試

Client級別的重試將對該Client下的所有 Request 生效,使用時,可以通過自定義RetryOptions引數更改重試次數、重試條件、重試間隔時間等。具體配置方式如下

final RestClient client = RestClient.create()

.retryOptions(RetryOptions.options()

.maxRetries(3)

//設定每次重試的間隔時間

.intervalMs(retryCount-> (retryCount+1) * 3000L)

//判斷是否要重試

.predicate((request, response, ctx, cause) -> cause != null)

.build())

.connectionPoolSize(2048)

.build();

2.6.2 Request級別重試

當Request設定了重試次數,其資料將覆蓋Client設定的重試次數,具體配置方式如下:

final RestClient client = RestClient.ofDefault();



client.get("http://127.0.0.1:8081/")

.maxRetries(3)

.execute()

.thenAccept((response)-> {

try {

System.out.println(response.bodyToEntity(String.class));

} catch (Exception e) {

e.printStackTrace();

}

});

2.7 重定向

預設情況下,RestClient會對響應狀態碼為301,302,303,307,308的請求重定向,其中:最大重定向次數為5(不包含原始請求)。使用時,可以通過maxRedirects更新重定向次數或者禁用(maxRedirects=0)重定向功能。

2.7.1 Client設定重定向

Client級別的重定向將對該Client下的所有 Request 生效,具體配置方式如下:

final RestClient client = RestClient.create()

.maxRedirects(3)

.build();

2.7.2 Request設定重定向覆蓋Client的設定

當Request設定了重定向次數,其資料將覆蓋Client設定的重定向次數,具體配置方式如下:

final RestClient client = RestClient.ofDefault();



client.get("http://127.0.0.1:8081/")

.maxRedirects(3)

.execute()

.thenAccept((response)-> {

try {

System.out.println(response.bodyToEntity(String.class));

} catch (Exception e) {

e.printStackTrace();

}

});

2.8 其它功能

如果使用者想對RestClient的功能有進一步瞭解,可以參考:《RestClient 功能文件》。

3 效能表現

3.1 測試場景

服務端為一個Echo伺服器,客戶端分別使用RestClient、Apache HttpAsyncClient 、 OK Httpclient均使用POST請求,請求體內容為固定字串: OK,響應體內容也為固定字串:OK。

3.2 機器配置

客戶端

OS 

記憶體(G)

CPU 核數

Client

CentOS Linux release 7.7.1908 (Core)

4

Server

CentOS Linux release 7.7.1908 (Core)

16 

8

3.3 JVM引數

-Xms1024m -Xmx1024m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70

3.4 客戶端版本

客戶端

 RestClient  

Apache HttpAsyncClient

OK Http Client

版本

1.0.0

4.9.2

3.5 測試方法

如何測試非同步客戶端的效能?這是我們在效能測試前要面對的第一個問題,下面是我們對於該問題的一些思考:

能否for迴圈發起同步請求,同時使用多執行緒來達到框架的請求處理極限,並將該極限視為客戶端的最佳TPS?

一般來說,使用者既然選擇非同步客戶端,肯定大部分時間都會使用非同步的方式去發起請求,使用同步方式進行測試的結果並不能代表非同步時客戶端的效能。因此對於非同步客戶端的使用者而言,同步方式測試的最終結果並沒有很大的參考價值。

因此這種方式並不適合用來對非同步客戶端進行效能測試。

能否使用單執行緒for迴圈非同步發起請求,直接將這個時候的TPS視為客戶端的最佳TPS?

非同步客戶端非同步發起請求時,發起請求的方法返回的非常快(因為請求執行的過程主要在IO執行緒池中進行)。儘管只使用單執行緒,如果一直for迴圈非同步發起請求,請求發起的速度也會比IO執行緒池處理請求的速度快得多,這會導致大量請求在程式中的某個地方(如獲取連線)堆積,從而導致程式報錯,或效能低下。

因此這種方式也並不適合用來對非同步客戶端進行效能測試。

那麼應該如何測試非同步客戶端的效能呢?

非同步客戶端專為非同步而生,使用者既然選擇非同步客戶端,肯定大部分時間都會使用非同步的方式發起請求,因此對於非同步客戶端而言,使用非同步的方式去測試其效能是一種更加合適的方式。

問題的關鍵在於在測試過程中,如何避免過快地發起非同步請求導致發起請求的速度超過框架的處理能力。

主要問題確定了,那麼答案基本也就確定了。要避免過快地發起非同步請求,我們可以想辦法調整非同步請求發起的速度,對於調整非同步請求的發起速度,我們可以嘗試用以下兩種方式:

 ·for迴圈週期性地傳送一定次數的非同步請求後,sleep一會兒,然後再繼續發起非同步請求。我們可以通過 控制sleep的時間 和 控制多少個請求間隔進行sleep 兩個變數來控制非同步請求的發起速率。

 ·for迴圈週期性地傳送一定次數的非同步請求後,傳送一個同步請求,然後再繼續發起非同步請求。用同步請求去代替sleep的時間,該同步請求執行完恰好說明了請求佇列中的請求都已經排隊結束。其實原理是相同的,但這樣控制的變數更少一些,僅需要控制發起多少個非同步請求後發起一次同步請求(即:一個週期內非同步請求次數與同步請求次數的比例)。

上面兩種方法都可以控制非同步請求的發起速率,最終我們選擇使用第二種方法來控制非同步請求的發起速率,因為第二種方式需要控制的變數更少,這樣我們的測試過程也會更加簡單。

因此最終我們的測試方法為:

使用非同步與同步交替的方式來發起請求,不斷調整一個週期內非同步請求與同步請求的比例,在每個比例下調整客戶端的各項配置,使其達到最佳的TPS,記錄每個比例下,框架的最佳TPS,直到找到增加 非同步請求與同步請求的比例 時,框架的TPS不再上升,甚至下降的拐點,該拐點即為框架的效能極限點。

3.6 測試結果

上圖中,橫座標為非同步請求與同步請求的比例,縱座標為TPS,通過上圖我們可以看出:

RestClient:隨著非同步與同步請求比例增大而先增大後減小,非同步與同步請求比例為800時,TPS最佳,為111217.98。

Apache HttpAsyncClient:隨著非同步與同步請求比例增大而先增大後減小,非同步與同步請求比例為800時,TPS最佳,為 83962.54。

OK Httpclient :隨著非同步與同步請求比例增大而先增大後減小,非同步與同步請求比例為300 時,TPS最佳,為 70501.59。

3.7 結論

RestClient在上面場景中最佳TPS 較 Apache HttpAsyncClient的最佳TPS高 32%,較OK Httpclient的最佳TPS高57% 。

4 架構設計

4.1 設計原則

 ·高效能:持續不懈追求的目標 & 核心競爭力。

·高擴充套件性:開放擴充套件點,滿足業務多樣化的需求。

 ·全鏈路非同步:基於CompletableStage提供完善的非同步處理能力。

4.2 結構設計

上圖為RestClient的結構圖,我們由上到下依次介紹一下各個部分的含義:

4.2.1RestInterceptorChain

RestInterceptorChain為RestInterceptor的集合,使用者呼叫請求時,將依次經過RestInterceptorChain中的所有RestInterceptor。使用者可以通過實現RestInterceptor中的getOrder()方法來指定其在RestInterceptorChain中的排序。

4.2.2EncodeAdviceChain

EncodeAdviceChain為EncodeAdvice的集合,在Encode前,將依次經過EncodeAdviceChain中的所有EncodeAdvice。使用者可以通過實現EncodeAdvice中的getOrder()方法來指定其在EncodeAdviceChain中的排序。

4.2.3 EncoderChain

EncoderChain為Encoder的集合,在Encode時,將依次經過EncoderChain中的所有Encoder,直到某個Encoder直接返回Encode的結果(即:其可以Encode該請求)。使用者可以通過實現Encoder中的getOrder()方法來指定其在EncoderChain中的排序。

4.2.4DecodeAdviceChain

DecodeAdviceChain為DecodeAdvice的集合,在Decode前,將依次經過DecodeAdviceChain中的所有DecodeAdvice。使用者可以通過實現DecodeAdvice中的getOrder()方法來指定其在DecodeAdviceChain中的排序。

4.2.5DecoderChain

DecoderChain為Decoder的集合,在Decode時,將依次經過DecoderChain中的所有Decoder,直到某個Decoder直接返回Decode的結果(即:其可以Decode該響應)。使用者可以通過實現Decoder中的getOrder()方法來指定其在DecoderChain中的排序。

4.2.6NettyTransceiver

NettyTransceiver 是 RestClient與其底層框架Neety之間連線的橋樑,在介紹其之前,需要一些預備知識,我們先來簡單介紹一下這些預備知識:

4.2.6.1Channel & ChannelPool &ChannelPools

Channel :Channel是Netty網路操作抽象類,它聚合了一組功能,包括但不限於網路的讀、寫,客戶端發起連線、主動關閉連線,鏈路關閉,獲得通訊雙方的網路地址等。它也包含了Netty框架相關的一些功能,包括獲取該Channel的EventLoop,獲取緩衝分配器ByteBufAllocator和pipeline等。

ChannelPool:ChannelPool用於快取Channel,它允許獲取和釋放Channel,並充當這些Channel的池,從而達到複用Channel的目的。在RestClient中,每一個Server host對應一個ChannelPool。

ChannelPools:ChannelPools用於快取ChannelPool。在RestClient中,當一個Server host長時間沒有被訪問時,其所對應的ChannelPool將會被視作快取過期,從而被回收資源。

4.2.6.2EventLoop & EventLoopGroup

EventLoop:EventLoop在Netty中被用來執行任務來處理在Channel的生命週期內發生的事件。在RestClient中,一個EventLoop對應了一個執行緒。

EventLoopGroup:EventLoopGroup為一組EventLoop,其保證將多個任務儘可能地均勻地分配在多個EventLoop上。

4.2.6.3Epoll

Epoll是Linux核心的可擴充套件I/O事件通知機制,包含下面這三個系統呼叫。

int epoll_create(int size);

在核心中建立epoll例項並返回一個epoll檔案描述符(對應上圖中 EpollEventLoop中的epollFD)。在最初的實現中,呼叫者通過 size 引數告知核心需要監聽的檔案描述符數量。如果監聽的檔案描述符數量超過 size, 則核心會自動擴容。而現在 size 已經沒有這種語義了,但是呼叫者呼叫時 size 依然必須大於 0,以保證後向相容性。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

向 epfd 對應的核心epoll 例項新增、修改或刪除對 fd 上事件 event 的監聽。op 可以為 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分別對應的是新增新的事件,修改檔案描述符上監聽的事件型別,從例項上刪除一個事件。

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

當 timeout 為 0 時,epoll_wait 永遠會立即返回。而 timeout 為 -1 時,epoll_wait 會一直阻塞直到任一已註冊的事件變為就緒。當 timeout 為一正整數時,epoll 會阻塞直到計時 timeout 毫秒終了或已註冊的事件變為就緒。因為核心排程延遲,阻塞的時間可能會略微超過 timeout 毫秒。

Epoll運作流程:

1、程序先通過呼叫epoll_create來建立一個epoll檔案描述符(對應上圖中 EpollEventLoop中的epollFD)。epoll通過mmap開闢一塊共享空間,該共享空間中包含一個紅黑樹和一個連結串列(對應上圖epollFD中對應的Shared space)。

2、程序呼叫epoll的epoll_ctl add,把新來的連結的檔案描述符放入紅黑樹中。

3、當紅黑樹中的fd有資料到了,就把它放入一個連結串列中並維護該資料可寫還是可讀。

上層使用者空間(通過epoll_wait)從連結串列中取出所有fd,然後對其進行讀寫資料。

4.2.6.4NettyTransceiver初始化

當RestClient剛完成初始化時,NettyTransceiver也剛完成初始化,其初始化主要包含下面兩部分:

·初始化 ChannelPools,剛初始化的ChannelPools為空,其內部不含有任何ChannelPool。

 ·初始化EpoolEventLoopGroup,

EpoolEventLoopGroup包含多個EpoolEventLoop。每個EpoolEventLoop都包含下面這三個部分:

executor:真正執行任務的執行緒。

taskQueue:任務佇列,使用者要執行的任務將被加入 到該佇列中,然後再被executor執行。

epollFD:epoll的檔案描述符,在EpoolEventLoop建立時,呼叫epoll_create來建立一個epoll的共享空間,其對應的檔案描述符就是epollFD。

4.2.6.5NettyTransceiver傳送請求

當第一次傳送請求時:NettyTransceiver將會為該Server host建立一個ChannelPool(如上圖中的ChannelPool1),並快取到channelPools中(預設10分鐘內該Server host沒有請求則視為快取過期,其對應的ChannelPool將被從channelPools中刪除)。ChannelPool在初始化時,主要包含下面兩部分:

·初始化channelDeque,用於快取channel,獲取channel就是從channelDeque中拿出一個channel。

 ·在EventLoopGroup中選定一個EventLoop作為executor,該executor用來執行獲取連線等操作。之所以ChannelPool需要一個固定的executor來執行獲取連線等操作,是為了避免出現多個執行緒同時獲取連線的情況,從而不需要對獲取連線的操作進行加鎖。

ChannelPool初始化完成後,則將由executor從ChannelPool中獲取channel,初次獲取時,由於ChannelPool中還沒有channel,則將初始化第一個channel,channel的初始化步驟主要包含下面幾步:

·建立連線,將連線封裝為channel。

 ·將channel對應的連線通過epoll_ctl add方法加入到EpollEventLoopGroup中的一個EpollEventLoop的epollFD對應的共享空間的紅黑樹中。

·將channel放到對應ChannelPool的channelDeque中。

初始化channel完成後,executor則將初始化好的channel返回,由繫結該channel的EpollEventLoop(即初始化channel第二步中所選定的EpollEventLoop)繼續執行傳送請求資料的任務。

4.2.6.6NettyTransceiver接收響應

4.2.7.1 執行緒模型的進一步優化

上面的執行緒模型為我們當前版本的執行緒模型,也是Netty自帶連線池的執行緒模型。但是這種執行緒模型的效能一定是最高的嗎?

這個問題的答案應該是否定的,因為儘管 ChannelPool 中指定一個 EventLoop 作為 executor 來執行獲取 Channel 的操作可以使得 獲取Channel 的過程無多執行緒爭搶,但是卻引入了下面這兩個問題:

· 獲取Channel到Channel.write()之間大概率會進行一次EventLoop 切換 (有可能會將 獲取Channel 與 Channel.write() 分配到同一個EventLoop ,如果分配到同一個EventLoop,則不需要進行EventLoop 切換 ,所以這裡說大概率會切換),這次切換是有一定的效能成本的。

 ·EventLoopGroup中的 EventLoop任務分配不均勻。因為channelPool中獲取連線的那個 EventLoop在獲取連線的同時還要處理資料的收發,比其他EventLoop多做一些工作,該EventLoop也成為了效能瓶頸點。在我們實際測試當中,也的確發現有一個EventLoop的執行緒CPU利用率較其它EventLoop更高一些。

那麼更優越的執行緒模型是怎樣的呢?通過上面的分析,我們覺得它應該要滿足下面兩點:

·獲取Channel 到 Channel.write() 之間無執行緒切換。

 ·每個EventLoop的任務分配均勻。

基於我們的需求,我們可以得出最佳的結構模型與執行緒模型應該為下面這種:

優化後的結構模型:

如上圖所示:一個 ChannelPool 由多個 ChildChannelPool 構成(個數 = IO執行緒個數),一個ChildChannelPool與一個 EventLoopGroup繫結,該EventLoopGroup僅含有一個 EventLoop (即一個ChildChannelPool對應一個EventLoop)。

優化後的執行緒模型:

如上圖所示:先在業務執行緒中執行一些操作並獲取

ChannelPool ,及選取一個 ChildChannelPool (選取的實現類似於 EventLoopGroup.next()實現,其保證了ChildChannelPool 的均勻分配),然後通過 ChildChannelPool來獲取 Channel (該過程在ChildChannelPool 對應的 EventLoop中執行),然後呼叫Channel.write()

(該過程也在ChildChannelPool 對應的 EventLoop 中執行) 。

上述過程巧妙的達成了我們一開始所需要的高效能執行緒模型的兩點:

 ·獲取Channel 到 Channel.write() 之間無執行緒切換 —— 由於ChildChannelPool 中的EventLoopGroup僅有一個EventLoop ,其建立的Channel 也只能繫結該EventLoop ,因此獲取Channel與Channel.write()都只能在該EventLoop 種執行,從而沒有了執行緒切換。

 ·每 個 EventLoop任務分配均勻 —— 由於ChildChannelPool 是被均勻地從 ChannelPool 中獲取的(該過程與EventLoopGroup.next() 的過程類似),而一個ChildChannelPool 剛好對應了一個EventLoop ,從而使得請求任務被均勻分配。

實踐中我們也通過一個Demo進行了測試:發現採用上面這種執行緒模型與結構模型,使得 RestClient的效能在當前版本的基礎上又提升了20%左右。預計下個版本中RestClient將會提供上面這種執行緒模型與結構模型。

其它效能優化的一些設計

5、Netty

RestClient基於Netty編寫,Netty自帶的一些高效能特性自然是RestClient高效能的基石,Netty常見特性均在RestClient中有所運用:

·Epoll

·Channel & ChannelPool

 ·EventLoop & EventLoopGroup

·ByteBuf & PooledByteBufAllocator

 ·Future & Promise

 ·FastThreadLocal &InternalThreadLocalMap

· ...

其中:Epoll、 Channel & ChannelPool、EventLoop & EventLoopGroup 我們在該篇文件的結構設計部分已經有過講解,這裡不再對其做過多解釋,下面我們主要來看看其它幾個部分:

5.1 ByteBuf & PooledByteBufAllocator

Netty使用了即易於使用又具備良好效能的ByteBuf來替代ByteBuffer。這裡不對ByteBuf進行詳細的介紹,主要簡單介紹RestClient中如何利用ByteBuf來提高效能以獲得更好地使用者體驗:

·傳送請求時,使用PooledByteBufAllocator來分配ByteBuf,其池化了ByteBuf的例項以提高效能並最大限度地減少記憶體碎片。

 ·接收響應時,使用CompositeByteBuf,它提供了一個將多個緩衝區表示為單個合併緩衝區的虛擬表示,減少了當響應分批次到來時聚合響應產生的不必要地資料拷貝。

5.2 Future & Promise

Future & Promise 為Netty非同步的基石,這裡不對Future & Promise 進行詳細的介紹,主要介紹RestClient中對於Future & Promise 一些相關的技術上取捨。

RestClient利用Future & Promise來實現資料包收發時的非同步,並在面向使用者時將Future & Promise轉化成CompletionStage。由此實現了從資料包收發 到使用者編解碼的整個請求鏈路的非同步化。

5 .3 

WhyCompletionStage ,Not Future & Promise?

CompletionStage是Java8新增的一個介面,用於非同步執行中的階段處理,其大量用在Lambda表示式計算過程中,目前只有CompletableFuture一個實現類。

比起Netty的Future & Promise,Java開發者更加熟悉CompletionStage,且CompletionStage的介面功能也更加強大,使用者可以借其更加靈活地實現業務邏輯。

5.4  Why CompletionStage ,Not CompletableFuture?

之所以使用CompletionStage

而不使用CompletableFuture。是因為 CompletionStage 是介面,而CompletableFuture為 CompletionStage 的實現,使用 CompletionStage 更符合面向介面程式設計的原則。同時使用者也可以使用CompletionStage.toCompletableFuture()來將CompletionStage轉化為CompletableFuture。

5.5

How To Combine Future & Promise 

With CompletionStage?

在使用者呼叫請求傳送時,我們構建了一個CompletionStage,並在執行Netty處理請求與響應邏輯返回的Future中增加Listener,在該Listener中結束CompletionStage。通過這樣實現了將 Future & Promise 與 CompletionStage 結合,從而實現整個請求鏈路的非同步化。

對這塊感興趣的使用者可以檢視io.esastack.httpclient.core.netty.HttpTransceiverImpl中的handle()方法,該方法中完成了 Future到CompletionStage的轉化。

5.6 FastThreadLocal&InternalThreadLocalMap

FastThreadLocal通過將ThreadLocal中使用雜湊結構的ThreadLocalMap改為了直接使用陣列結構的InternalThreadLocalMap。ThreadLocal與FastThreadLocal結構圖大致如下:

ThreadLocal結構圖

FastThreadLocal結構圖

如上圖所示,比起ThreadLocalMap,InternalThreadLocalMap直接根據 index 來獲取值、設定值的做法更加簡單,且直接使用陣列的複雜度更低(雖然 ThreadLocalMap也是陣列結構,但是其在陣列的存取操作外還封裝了大量hash計算以及防止hash碰撞的相關操作)。因此FastThreadLocal獲得了更高的效能。

RestClient中均使用FastThreadLocal代替ThreadLocal以獲取更高的效能。

5.7 Encode & Decode

與大多數Http客戶端框架不同,RestClient不僅支援將Java物件Encode成byte[],還支援將Java物件Encode成其他底層Netty支援的物件,如:File、MultipartBody等,未來還將會支援ChunkInput用來支援將請求體比較大的請求分塊傳送。

之所以這麼設計,是因為如果我們僅僅支援將Java物件Encode成byte[],那麼當Encode後的byte[]資料過大時,將會出現OutOfMemoryException。使用者在傳送大檔案或者本身請求體較大的請求時,都會出現這個問題。

為了解決這個問題,RestClient通過讓使用者可以將Java物件Encode成File或者ChunkInput來解決這一類問題。當用戶將Java物件Encode成File時,RestClient將會藉助底層的Netty使用NIO以零拷貝的方式傳送檔案,避免了OOM的同時又減少了資料的多次拷貝。

同理當用戶將Java物件Encode成ChunkInput時,RestClient將會分塊傳送資料,避免資料一次性全部載入進記憶體,從而避免OOM的情況。(PS:ChunkInput當前版本暫不支援,但已留出擴充套件點,將在下一版本支援)

Decode時也做了同樣的優化,由於原理相同這裡就不再展開講解了。

6、結語

儘管RestClient主要只涉及發起請求這個簡單的功能,但是“麻雀雖小,五臟俱全”,它考慮到了效能優化的方方面面,同時在 介面設計、程式碼整潔、功能完善 幾個方面上也儘量做到了毫不妥協。

它還是一個年輕的專案,歡迎各路技術愛好者們加入,一同探討學習與進步。

作者簡介

Without     OPPO雲端計算中心雲原生組後端工程師

深度參與ESA Restlight、ESA RestClient、 ServiceKeeper等 個高效能開源框架專案

推薦閱讀

|開源高效能Web服務框架ESA Restlight

|YARN的介紹及實踐探索