官方推薦使用的OkHttp網路請求庫全面解析!
theme: smartblue
前言
現在談起網路請求,大家肯定下意識想到的就是 okhttp
或者 retrofit
這樣的三方請求庫。誠然,現在有越來越多的三方庫幫助著我們快速開發,但是對於現在的程式設計師來說,我們不僅要學會如何去用,更重要的是要清楚裡面的原理,瞭解裡面的思想,最後轉化到我們自己的實際程式碼中去。
⚡ okhttp
使用起來還是比較方便,優點也有很多:支援GZIP壓縮
,連線池複用底層TCP
,請求自動重試重定向
...
現在 Google官方 也將原始碼當中的 HttpURLConnection
底層實現改成 okhttp
了,同時 retrofit
的底層也是 okhttp
,足以說明其在日常開發中的重要性。現在我們正式進入今天的正題。
✔️ 本文閱讀時長約為: 20min
OkHttp版本:4.0.1 ----> 注意此版本起是Kotlin版本哦
okhttp的基本使用流程
``` // 1.建立client OkHttpClient client = new OkHttpClient().newBuilder() .cookieJar(CookieJar.NO_COOKIES) .callTimeout(10000, TimeUnit.MILLISECONDS) .build();
// 2.建立request Request request = new Request.Builder() .url("http://10.34.12.156:68080/admin-api") .addHeader("Content-Type", "application/json") .get(); .build();
// 3.構建call物件 Call call = client.newCall(request);
// 4.1呼叫call物件的同步請求方法 Response response = call.execute();// response物件中儲存的有返回的響應引數
// 4.2呼叫call物件的非同步請求方法 call.enqueue(new Callback() { @Override public void onFailure(@NonNull Call call, @NonNull IOException e) { Log.d(TAG, "onFailure: ");// 失敗回撥 }
@Override
public void onResponse(@NonNull Call call, @NonNull Response response) {
Log.d(TAG, "onResponse: ");// 成功回撥
}
}); ```
okhttp 的好處就在於,我們完成一次網路請求最少只需要接觸 OkHttpClient
,Request
,Call
這三個物件,顯然是很輕鬆的。此外還需強調一點,OkHttpClient 和 Request 都是通過建造者模式構建的,這樣的好處就在於使用者可以根據自己的需求輕鬆簡潔的配置一些可選引數,而不必通過傳統方式將不需要的引數寫成 null
。
同步請求和非同步請求的流程
1.同步請求的執行流程
我們將 OkhttpClient
中的 newCall()
作為入口,開啟整個同步請求的過程。
```
// OkHttpClient.kt
// 這裡是構建一個 RealCall 物件
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
``
RealCall實現了
Call介面,我們先來看看
Call` 介面長什麼樣。
```` // Call.kt interface Call : Cloneable {
// 同步請求方法
@Throws(IOException::class)
fun execute(): Response
// 非同步請求方法 fun enqueue(responseCallback: Callback)
// OkHttpClient實現了Factory介面
// 所以才有newCall方法
fun interface Factory {
fun newCall(request: Request): Call
}
}
`
現在我們通過 `newCall()` 得到了一個 `RealCall` 物件,然後就能通過 `RealCall` 當中的 `execute()` 和 `enqueue()` 進行網路請求。
// RealCall.kt
override fun execute(): Response {
// 一個call物件只能執行一次execute方法 // 這裡用CAS思想進行比較,可以提高效率 check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
// 這裡主要是個監聽器,表示開始進行網路請求了
callStart()
// 重點關注這塊
try {
// 通過分發器進行任務分發
// 其實這裡還體現不出分發器的效果,僅僅是將當前請求加入到一個同步隊列當中
client.dispatcher.executed(this)
// 通過 getResponseWithInterceptorChain() 獲得相應結果
return getResponseWithInterceptorChain()
} finally {
// 完成一些收尾工作,在同步請求中,幾乎沒什麼用
client.dispatcher.finished(this)
}
}
現在來看看同步請求中分發器做了什麼工作呢?
// Dispatcher.kt
// 正在執行的同步請求佇列
private val runningSyncCalls = ArrayDeque
// 簡單的將當前請求加入到同步請求佇列中 @Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) }
// finish方法 --> 前面說的首尾工作方法 // 但是在同步請求中用處不大 internal fun finished(call: RealCall) { finished(runningSyncCalls, call) }
private fun
// 這裡是進行首尾工作的 // 主要體現在非同步請求中,因此這個方法先放一放 val isRunning = promoteAndExecute()
// 進行空閒回撥方法
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
``
可以看到,在整個同步請求的過程中,分發器僅僅是將當前的請求加入到一個同步請求佇列中,請求完成後再將其移除。因為在同步請求中
finished()方法只有一個回撥作用,因此我們將它放一放,重點看一看非同步請求中的
finished()`。
2.非同步請求的執行流程
非同步請求就比同步請求稍微複雜了一點,我們仍然是從 RealCall
中看起。
```
// RealCall.kt
override fun enqueue(responseCallback: Callback) { // 一個call物件只能執行一次execute方法 // 這裡用CAS思想進行比較,可以提高效率 check(executed.compareAndSet(false, true)) { "Already Executed" }
// 這裡面依舊加上了監聽 callStart()
// 構建一個 AsyncCall物件,再交給dispatcher進行分發流程
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
`AsyncCall` 是 `RealCall` 的內部類, 它實現了 **Runnable** 介面,主要是為了能線上程池中去執行它的 `run()` 方法。
// RealCall.kt
internal inner class AsyncCall( private val responseCallback: Callback ) : Runnable { @Volatile var callsPerHost = AtomicInteger(0) private set
fun reuseCallsPerHostFrom(other: AsyncCall) { // callPerHost代表了連線同一個host的連線數 // 此連線數必須小於5 this.callsPerHost = other.callsPerHost }
val host: String get() = originalRequest.url.host
val request: Request get() = originalRequest
val call: RealCall get() = this@RealCall
// 將asyncCall新增到執行緒池中去執行的方法 fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 執行緒池去執行當前AsyncCall物件的run方法
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 收尾工作
// 其實內部呼叫的是 promoteAndExecute()
client.dispatcher.finished(this)
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// getResponseWithInterceptorChain() 獲得請求的響應結果
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 請求成功的回撥
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 請求失敗的回撥
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 進行收尾工作
// 相比同步請求的finished方法,這兒更重要
client.dispatcher.finished(this)
}
}
}
}
現在我們回到 `Dispatcher` 中去。
// Dispatcher.kt
// 準備執行的非同步請求佇列
private val readyAsyncCalls = ArrayDeque
// 正在執行的非同步請求佇列
private val runningAsyncCalls = ArrayDeque
internal fun enqueue(call: AsyncCall) { synchronized(this) { // 加當前asyncCall加到準備執行的非同步請求佇列中 readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
// 這裡是得到連線同一個 host 的請求數
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
} // dispatcher進行分發call任務的方法 promoteAndExecute() }
// 關鍵方法,dispatcher進行任務分發的方法
// 進行收尾工作時,也是呼叫的它
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
// 需要開始執行的任務集合
val executableCalls = mutableListOf
i.remove()
// 如果拿到了符合條件的asyncCall物件,就將其callPerHost值加1
// callPerHost代表了連線同一個host的數量
asyncCall.callsPerHost.incrementAndGet()
// 加到需要開始執行的任務集合中
executableCalls.add(asyncCall)
// 將當前call加到正在執行的非同步隊列當中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) { // 遍歷每一個集合中的asyncCall物件 // 將其新增到執行緒池中,執行它的run方法 val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) }
return isRunning
}
``
現在我們通過
Dispatcher將
AsyncCall` 物件通過挑選,加到了執行緒池中。挑選的限制有兩個:
1.當前執行的總請求數要小於64個。
2.對於連線的同一個host請求,要保證數量小於5。
現在,我們再回頭看看將 AsyncCall
物件加到執行緒池後的一些細節吧!
```
// Dispatcher.kt
// 將asyncCall新增到執行緒池中去執行的方法 fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 這裡是之前自定義了建立了一個ExecutorService
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 這裡也是會執行收尾工作
client.dispatcher.finished(this)
}
}
}
@get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { // !!這裡的corePoolSize是 0 // !!阻塞佇列是 SynchronousQueue executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! }
``
✊ 我們先來看
executeOn()方法,它的主要工作就是執行新增到執行緒池的
AsyncCall物件的
run()方法,去進行網路請求。其次我們目光移動到
finally語句塊,會發現每次執行完
run()方法後,即完成網路請求後,都會去執行這個
finished()方法。前面講到過,內部其實是再次呼叫了
promoteAndExecute()` 方法。那這是為什麼呢?
還記得到我們從準備執行的非同步佇列中挑選一些 AsyncCall
物件拿到執行緒池中執行嗎?如果記得,那你是否還記得我們是有挑選條件的,正因如此,可能在準備執行的非同步請求佇列中會有一些 AsyncCall
物件不滿足條件仍然留在佇列裡!那我們難道最後就不執行這些網路請求了嗎?當然不是!原來每完成一次網路請求就會再次觸發 Dispatcher
去分發 AsyncCall
物件!原來如此。
✊ 然後我們再來看看這裡用到的執行緒池是一個什麼樣的執行緒池。在上面我貼出來的程式碼中可以看到,這個執行緒池的 corePoolSize
是 0
,BlockingQueue
是 SynchronousQueue
,這樣構建出來的執行緒池有什麼特殊之處嗎?熟悉執行緒池的同學都應該知道,當任務數超過了 corePoolSize
就會將其加到阻塞隊列當中。也就是說這些任務不會立馬執行,而我們的網路請求可不想被阻塞著,因此這裡的 corePoolSize
就設定成了 0
。BlockingQueue
設定成 SynchronousQueue
也是類似道理,SynchronousQueue
是不儲存元素的,只要提交的任務數小於最大執行緒數就會立刻新起執行緒去執行任務。
3.okhttp網路請求執行過程總結
總結一下整個 okhttp
網路請求的整個過程。首先通過我們通過 構造者 的方式構建好了 OkHttpClient
和 Request
物件,然後呼叫 OkHttpClient
物件的 newCall()
方法得到一個 RealCall
物件,最後再呼叫其 execute()
或者 enqueue()
方法進行同步或者非同步請求。
然後如果是同步請求,Dispatacher
分發器去只是簡單的將其加入到正在執行的同步請求佇列中做一個標記,如果是非同步請求就會根據 兩個條件 去篩選合適的請求,並將其傳送給一個特定的執行緒池中去進行網路請求,最後通過 getResponseWithInterceptorChain()
得到最終結果。
請求任務之五大攔截器各司其職
okhttp的又一大特點是整個請求流程是由五大攔截器一層層分發下去,最後得到結果再一層層返回上來。 就像這樣:
okhttp
內建了五大攔截器,這五大攔截器各司其職,通過責任鏈模式將請求逐層分發下去,每層完成自己這層該做的事,最後拿到相應結果逐層往上返回結果:
1️⃣. RetryAndFollowUpInterceptor:請求失敗自動重試,如果 DNS
設定了多個ip地址會自動重試其餘ip地址。
2️⃣. BridgeInterceptor:會補全我們請求中的請求頭,例如Host
,Cookie
,Accept-Encoding
等。
3️⃣. CacheInterceptor:會選擇性的將響應結果進行儲存,以便下次直接讀取,不再需要再向伺服器索要資料。
4️⃣. ConnectInterceptor:建立連線並得到對應的socket;管理連線池,從中存取連線,以便達到連線複用的目的。
5️⃣. CallServerInterceptor:與伺服器建立連線,具體進行網路請求,並將結果逐層返回的地方。
注意:這五個是系統內建的攔截器,我們也可以通過 addInterceptor()
加入我們自己寫的攔截器,後面會提到。
現在我們從開啟攔截器的入口函式 getResponseWithInterceptorChain()
看起,
```
// RealCall.kt
// 五大攔截器的起始入口
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// 用一個集合儲存所有的攔截器
val interceptors = mutableListOf
// 構建RealInterceptorChain物件,我們正是通過此物件將請求逐層往下傳遞的 val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis )
var calledNoMoreExchanges = false
try {
// 呼叫RealInterceptorChain的proceed()方法,將請求向下一個聯結器傳遞
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
// 放回響應結果
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
``
從這個方法中我們大概可以總結出,它將所有的攔截器包括使用者自定義的攔截器全部通過一個集合儲存了下來,然後構建出了
RealInterceptorChain物件,並呼叫其
proceed()` 方法開始了攔截器逐層分發工作。
那麼它是怎麼做到逐層分發的呢?其實很簡單,每一個攔截器中都會通過 proceed()
方法再構建一個 RealInterceptorChain
物件,然後呼叫 intercpt
去執行下個攔截器中的任務,如此迴圈,最終走到最後一個攔截器後退出。
``` // RealInterceptorChain.kt -----> 實現了 Chain 介面
override fun proceed(request: Request): Response { // 檢查是否走完了所有的攔截器,是則退出 check(index < interceptors.size
... // 這個方法就是再次構建了 RealInterceptorChain 物件 ==> next // 去執行下個攔截器中的任務 val next = copy(index = index + 1, request = request)// 這個方法內部就一行程式碼 new RealInterceptorChain() val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS") // 通過呼叫intercept(next)去執行下一個攔截器中的任務 val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null") ... // 將結果放回到上一個攔截器中 return response } ``` 以上我們搞清楚了攔截器是如何一步一步往下傳遞任務,並逐層往上返回結果的,現在我們來具體看看每個攔截器都做了什麼事情。
1.RetryAndFollowUpInterceptor攔截器
``` // RetryAndFollowUpInterceptor.kt // 所有攔截求都實現了 Interceptor 介面
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf
try {
// 呼叫下一個攔截器,即 BridgeInterceptor
// 整個請求可能會失敗,需要捕獲然後重試重定向,因此有一個try catch
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) { //1.進行重試
// 1.1 路線異常,檢查是否需要重試
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
// 失敗繼續重試
continue
} catch (e: IOException) {
// 1.2 IO異常 HTTP2才會有ConnectionShutdownException 代表連線中斷
//如果是因為IO異常,那麼requestSendStarted=true (若是HTTP2的連線中斷異常仍然為false)
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
// 失敗繼續重試
continue
}
// priorResponse:上一次請求的響應
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
// 2.根據返回的response進行重定向,構建新的Request物件
val followUp = followUpRequest(response, exchange)
// 2.1 followUp為空,代表沒有重定向,直接返回結果response
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
// 2.2 followUp不為空,但是body中設定了只能請求一次(預設),返回重定向後的結果response
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
// 重試次數大於20次,丟擲異常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
// 將之前重定向後的Request物件賦值給request進行重新請求
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
} } ``` 簡單來說,RetryAndFollowUpInterceptor攔截器幫我們幹了兩件事。第一是重試,第二是重定向。
✅ 我們先來看看什麼情況下它會進行重試。 ``` // RetryAndFollowUpInterceptor.kt
// 這個方法就是來判斷當前請求是否需要重試的 private fun recover( e: IOException, call: RealCall, userRequest: Request, requestSendStarted: Boolean ): Boolean { // 構建OkHttpClient時配置不重試,則返回false if (!client.retryOnConnectionFailure) return false
// 返回false // 1、如果是IO異常(非http2中斷異常)表示請求可能發出 // 2、如果請求體只能被使用一次(預設為false) if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// 返回false // 協議異常、IO中斷異常(除Socket讀寫超時之外),ssl認證異常 if (!isRecoverable(e, requestSendStarted)) return false
// 無更多的路線,返回false if (!call.retryAfterFailure()) return false
// 以上情況都不是,則返回true 表示可以重試 return true } ``` 這裡畫個圖做個總結:
✅ 再來看看如何判斷是否需要重定向的。 ``` @Throws(IOException::class) private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? { val route = exchange?.connection?.route() val responseCode = userResponse.code
val method = userResponse.request.method when (responseCode) { // 407 代理需要授權,通過proxyAuthenticator獲得request,向其中新增請求頭 Proxy-Authorization // 然後構建一個新的request物件返回出去,準備再次請求 HTTP_PROXY_AUTH -> { val selectedProxy = route!!.proxy if (selectedProxy.type() != Proxy.Type.HTTP) { throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy") } return client.proxyAuthenticator.authenticate(route, userResponse) } // 401 伺服器請求需授權,通過authenticator獲得到了Request,新增Authorization請求頭 // 然後構建一個新的request物件返回出去,準備再次請求 HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
// 返回的響應碼是3xx,這就準備進行重定向,構建新的Request物件
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
return buildRedirectRequest(userResponse, method)
}
// 408 請求超時
HTTP_CLIENT_TIMEOUT -> {
// 使用者設定是否可以進行重試(預設允許)
if (!client.retryOnConnectionFailure) {
return null
}
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
val priorResponse = userResponse.priorResponse
// 如果上次也是因為408導致重試,這次請求又返回的408,則不會再去重試了,直接返回null
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
return null
}
// 伺服器返回的 Retry-After:0 或者未響應Retry-After就不會再次去請求
if (retryAfter(userResponse, 0) > 0) {
return null
}
// 返回當前的request物件,準備再次請求
return userResponse.request
}
// 503 服務不可用
HTTP_UNAVAILABLE -> {
val priorResponse = userResponse.priorResponse
// 和408相似,如果兩次都是503導致請求重試,那麼這次就不會再重試了,直接返回null
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
return null
}
// 服務端返回的有Retry-After: 0,則立即重試
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
return userResponse.request
}
return null
}
// 421 當前客戶端的IP地址連線到伺服器的數量超過了伺服器允許的範圍
HTTP_MISDIRECTED_REQUEST -> {
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
if (exchange == null || !exchange.isCoalescedConnection) {
return null
}
// 使用另一個連線物件發起請求
exchange.connection.noCoalescedConnections()
return userResponse.request
}
else -> return null
} } ``` 老規矩,依舊給大家畫張圖,便於理解。
2.BridgeInterceptor攔截器
接下來,來到第二個攔截器 BridgeInterceptor
。這個攔截器前面說過,主要就是用來補全請求頭的,除此之外就是如果響應頭中有Content-Encoding: gzip
,則會用 GzipSource
進行解析。
``` // BridgeInterceptor.kt
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val userRequest = chain.request() val requestBuilder = userRequest.newBuilder()
// 這裡沒有什麼多說的,就是補全請求頭
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
// 去呼叫下一個攔截器,並得到響應結果
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 根據響應頭中的 Content-Encoding,判斷是否需要gzip解析
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
// 進行gzip解析
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
``
橋接攔截器其實工作內容也很簡單,在請求之前,向我們的請求頭中新增必要的引數,然後拿到請求的響應後,根據響應頭中的引數去判斷是否需要
gzip解析,如果需要則用
GzipSource` 去解析就好了。
3.CacheInterceptor攔截器
在講 CacheInterceptor
攔截器之前,我們先來了解一下 HTTP的快取規則。 我們按照其行為將其分為兩大類:強快取和協商快取。
1️⃣強快取:瀏覽器並不會將請求傳送給伺服器。強快取是利用 http
的返回頭中的 Expires
或者 Cache-Control
兩個欄位來控制的,用來表示資源的快取時間。
2️⃣協商快取:瀏覽器會將請求傳送至伺服器。伺服器根據 http
頭資訊中的 Last-Modify/If-Modify-Since
或 Etag/If-None-Match
來判斷是否命中協商快取。如果命中,則 http
返回碼為 304
,客戶端從本地快取中載入資源。
搞清楚 Http的快取策略
後,我們來看看 CacheInterceptor
攔截器做了什麼工作吧。
```
// CacheInterceptor.kt
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val call = chain.call() val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() // 代表需要發起請求 val networkRequest = strategy.networkRequest // 代表直接使用本地快取 val cacheResponse = strategy.cacheResponse
...
// networkRequest 和 cacheResponse 都是null // 說明伺服器要求使用快取,但是本地沒有快取,直接失敗 if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE)// 構建一個空的response返回過去 .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build() }
// networkRequest為null cacheResponse不為null // 說明使用強快取,成功 if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } }
...
var networkResponse: Response? = null try { // 走到這裡說明需要請求一次,判斷是協商快取還是重新去伺服器上獲取資源 // 因此 呼叫下一個攔截器去繼續請求 networkResponse = chain.proceed(networkRequest) } finally { if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } }
// 協商快取 // 如果我們本地有快取,並且伺服器放回給我們304響應碼,直接使用本地的快取 if (cacheResponse != null) { if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()
networkResponse.body!!.close()
return response
...
} else {
cacheResponse.body?.closeQuietly()
}
}
// 走到這,說明響應碼是200 表示我們需要用這次新請求的資源 val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()
if (cache != null) { if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // 將本次最新得到的響應存到cache中去 val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { } }
...
}
// 將這次新請求的資源返回給上一層攔截器
return response
}
``
⚡ 總結一下快取攔截器處理快取的流程:首先得到
RealInterceptorChain物件,然後通過它再得到兩個很重要的物件:
networkRequest和
cacheResponse。
networkRequest代表去發起一個網路請求,
cacheResponse代表使用本地快取。通過這兩個物件是否為
null來判斷此次請求是使用直接快取,還是去請求新的資源,還是去使用協商快取。最後就是會更新快取,把每次新請求的資源都重新儲存至
cache` 中。
4.ConnectInterceptor攔截器
連線攔截器主要就是做建立連線和連線複用的工作,它會從連線池中取出符合條件的連線,以免重複建立,從而提升請求效率。
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// 獲取連線 Exchange:資料交換(封裝了連線)
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
// 繼續呼叫下一個攔截器去請求資料
return connectedChain.proceed(realChain.request)
}
}
可以看到,這個連線攔截器中的程式碼比較少,主要的邏輯都在 initExchange()
方法當中,這個方法的作用就是拿到一個連線物件建立連線。
``` // RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange { synchronized(this) { check(expectMoreExchanges) { "released" } check(!responseBodyOpen) check(!requestBodyOpen) }
val exchangeFinder = this.exchangeFinder!! // ExchangeCodec: 編解碼器 // find():查詢連線Realconnection // 內部呼叫 findHealthy 去找到可用連線 val codec = exchangeFinder.find(client, chain) // Exchange:資料交換器 包含了exchangecodec與Realconnection val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result this.exchange = result synchronized(this) { this.requestBodyOpen = true this.responseBodyOpen = true }
if (canceled) throw IOException("Canceled")
// 就是拿到了ExChange物件返回出去
return result
}
``
那麼
find()是如何找到連線的呢?我們直接點到
findConnection()` 裡面去。
``` // ExchangeFinder.kt
private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { // 當前請求被取消,拋異常 if (call.isCanceled()) throw IOException("Canceled")
...
// 關閉連線 toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) }
// 1.從連線池當中去尋找連線,達到複用的目的 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
...
// 建立新的連線 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { // 2.走到這裡說明沒有找到合適的連線,因此現在去建立一個新的連線 newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null }
// 對新建立的連線進行儲存,以便下次複用 synchronized(newConnection) { connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) }
... return newConnection } ``` 看到這裡,我們發現其實建立連線有兩種辦法,一種是從 連線池當中複用舊的連線,另一種就是去 建立一個新的連線 。
1️⃣ 連線池預設的空閒連線數是5個,空閒的連線時間也是5分鐘,如果空閒連線數超出了5個,那麼就清理掉最近最久未使用的連線數,直到連線池中的連線數小於等於5個。現在我們來看看連線池是個什麼樣子的吧。
``` // RealConnectionPool.kt
class RealConnectionPool(
taskRunner: TaskRunner,
private val maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) {
//
private val keepAliveDurationNs: Long = timeUnit.toNanos(keepAliveDuration)
private val cleanupQueue: TaskQueue = taskRunner.newQueue() // 這是個任務相當於定時任務 自動cleanup去清理多餘或者超時的連線 private val cleanupTask = object : Task("$okHttpName ConnectionPool") { override fun runOnce() = cleanup(System.nanoTime()) }
// 從連線池中取到合適的連線的方法
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List
// 空閒連線的數量 fun idleConnectionCount(): Int { return connections.count { synchronized(it) { it.calls.isEmpty() } } }
// 總的連線數 fun connectionCount(): Int { return connections.size }
// 往連線池中存連線的方法 fun put(connection: RealConnection) { connection.assertThreadHoldsLock() connections.add(connection) // 放入連線池時,也會啟動一次清理任務 cleanupQueue.schedule(cleanupTask) }
// 定期做清理工作的方法
fun cleanup
(now: Long): Long {
// 在使用的連線數量
var inUseConnectionCount = 0
// 空閒的連線數量
var idleConnectionCount = 0
// 最長閒置的連線
var longestIdleConnection: RealConnection? = null
// 最大閒置時間
var longestIdleDurationNs = Long.MIN_VALUE
// 遍歷每個連線,判斷是否達到清理條件 for (connection in connections) { synchronized(connection) { // 記錄正在使用與已經閒置的連線數 if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++ } else { idleConnectionCount++ // 記錄最長閒置時間的連線longestIdleConnection val idleDurationNs = now - connection.idleAtNs if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs longestIdleConnection = connection } else { Unit } } } }
when { // 最長閒置時間的連線超過了允許閒置時間 或者 閒置數量超過允許數量,清理此連線 longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections -> { val connection = longestIdleConnection!! synchronized(connection) { if (connection.calls.isNotEmpty()) return 0L // No longer idle. if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest. connection.noNewExchanges = true connections.remove(longestIdleConnection) }
connection.socket().closeQuietly()
if (connections.isEmpty()) cleanupQueue.cancelAll()
return 0L
}
// 存在閒置連線,下次執行清理任務在 允許閒置時間-已經閒置時候 後
idleConnectionCount > 0 -> {
return keepAliveDurationNs - longestIdleDurationNs
}
// 存在使用中的連線,下次清理在 允許閒置時間後
inUseConnectionCount > 0 -> {
// 所有點連線都在用,至少需要等待 一個允許閒置時間 後再清理
return keepAliveDurationNs
}
else -> {
// 沒有任何連線返回 -1
return -1
}
} }
}
``
2️⃣ 連線池的部分總的來說就是當要複用連線的時候,呼叫
isEligible()方法,去遍歷比較當前連線的
Host,
url,
ip等是否完全一樣,滿足條件就取出這個連線。另外對連線進行快取時,如果當前的連線數超過了5個,那麼會將最近最久未使用的連線進行清除。
cleanup()` 方法會每過一個清理週期自動做一次清理工作。
我們現在來看如果沒有從連線池中拿到合適的連線,是此時怎樣建立新的連線的。
不過在此之前還是先來看看有關 Http代理
的相關知識。
Http代理 分為普通代理和隧道代理,普通代理就起到一箇中間人的作用,它會在收到客戶端請求報文時,正確處理請求和連線狀態,然後向伺服器傳送行的請求。收到伺服器的結果後,再將響應結果包裝成一個響應體返回給客戶端。而隧道代理是將請求無腦轉發給服務端,隧道代理需要發起 CONNECT 請求,傳送到代理伺服器,代表建立好了隧道,然後將 CONNECT 請求後的所有資料,全部轉發給最終的服務端。
``` // RealConnection.kt
// 建立攔截的方法 fun connect( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, call: Call, eventListener: EventListener ) { ...
while (true) {
...
if (route.requiresTunnel()) {
// 隧道代理,會做兩件事
// 1.connectSocket 連線socket
// 2.傳送CONNECT請求頭 建立隧道代理
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break
}
} else {
// 普通代理 連線socket
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
...
}
``
connectTunnel()內部也是會呼叫
connectSocket(),只不過因為隧道代理會先發一個
CONNECT請求頭,因此比
connectSocket()方法內部多了一個
CONNECT` 請求頭的傳送。
對連線池的流程做個簡單總結:
5.CallServerInterceptor攔截器
``` // CallServerInterceptor.kt
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body val sentRequestMillis = System.currentTimeMillis() // 將請求頭寫入快取中 exchange.writeRequestHeaders(request)
var invokeStartEvent = true var responseBuilder: Response.Builder? = null if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // 大容量請求體會帶有 Expect: 100-continue 欄位,伺服器識別同意後,才能繼續傳送請求給服務端 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { // 與伺服器進行請求 exchange.flushRequest() responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } if (responseBuilder == null) { if (requestBody.isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { // 大部分情況都是走這裡,通過IO流把響應結果寫入response中 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection.isMultiplexed) { // 沒有響應 Expect:100 continue則阻止此連線得到複用,並且會關閉相關的socket exchange.noNewExchangesOnConnection() } } } else { exchange.noRequestBody() } ... var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code if (code == 100) { // 返回100 表示接收大請求體請求 繼續傳送請求體 // 得到response後返回結果 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! // 構建響應體 response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code }
response = if (forWebSocket && code == 101) { // 如果狀態碼是101並且是webSocket就返回空的response response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() }
return response
}
``
在
okhttp中,面對比較大的請求體時,會先去詢問伺服器是否接收此請求體,如果伺服器接收並返回響應碼
200,則
okhttp` 繼續傳送請求體,否則就直接返回給客戶端。如果伺服器忽略此請求,則不會響應,最後客戶端會超時丟擲異常。
攔截器部分完畢~
整個 okhttp
功能的實現就在這五個預設的攔截器中,所以先理解攔截器模式的工作機制是先決條件。這五個攔截器分別為: 重試攔截器、橋接攔截器、快取攔截器、連線攔截器、請求服務攔截器。每一個攔截器負責的工作不一樣,就好像工廠流水線,最終經過這五道工序,就完成了最終的產品。
但是與流水線不同的是,okhttp
中的攔截器每次發起請求都會在交給下一個攔截器之前幹一些事情,在獲得了結果之後又幹一些事情。整個過程在請求向是順序的,而響應則是逆序。
寫在最後
篇幅很長,能看到最後很不容易,給自己一個 大大的贊 吧!👏👏
如果覺得寫的不錯😀😀,就給個贊再走吧~
創作實屬不易,你的肯定是我創作的動力,下次見!
如果本篇部落格有任何錯誤的地方,請大家批評指正,不勝感激。