OkHttp原始碼分析(一)

語言: CN / TW / HK

theme: cyanosis


開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第37天,點選檢視活動詳情

文章中原始碼的OkHttp版本為4.10.0

implementation 'com.squareup.okhttp3:okhttp:4.10.0'

1.簡單使用

  • okHttp的簡單使用程式碼如下:

``` //建立OkHttpClient物件 val client = OkHttpClient().newBuilder().build()

//建立Request物件 val request = Request.Builder() .url("https://wanandroid.com/wxarticle/list/408/1/json") //新增請求的url地址 .build() //返回一個Request物件

//發起請求 fun request() { val response = client .newCall(request) //建立一個Call物件 .enqueue(object : Callback { //呼叫enqueue方法執行非同步請求 override fun onFailure(call: Call, e: IOException) { TODO("Not yet implemented") }

        override fun onResponse(call: Call, response: Response) {
            TODO("Not yet implemented")
        }
    })
}

```

  • 工作流程有四步:
    • 建立OkHttpClient物件
    • 建立Request物件
    • 建立Call物件
    • 開始發起請求,enqueue為非同步請求,execute為同步請求

2.OkHttpClient物件是如何建立的

val client = OkHttpClient().newBuilder().build()

OkHttpClient物件的建立是一個典型的建造者模式,先看一下newBuilder方法做了什麼,原始碼如下:

//建立了一個Builder物件 open fun newBuilder(): Builder = Builder(this)

``` class Builder constructor() { //排程器 internal var dispatcher: Dispatcher = Dispatcher() //攔截器集合 internal val interceptors: MutableList = mutableListOf() //網路攔截器集合 internal val networkInterceptors: MutableList = mutableListOf()

...

} ```

newBuilder中建立了一個Builder物件,Builder物件的建構函式中定義了很多的變數,這裡只保留了3個重要的。

下面看一下build方法做了什麼

//這個this就是上面建立的Builder物件 fun build(): OkHttpClient = OkHttpClient(this)

okHttpClient原始碼如下

``` open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory {

@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher

@get:JvmName("interceptors") val interceptors: List = builder.interceptors.toImmutableList()

@get:JvmName("networkInterceptors") val networkInterceptors: List = builder.networkInterceptors.toImmutableList()

...

} ```

通過OkHttpClient物件的原始碼可以得知,Builder建立的排程器、攔截器最終都會交給OkHttpClient,這是建造者模式的特定。

3.Request物件是如何建立的

val request = Request.Builder() .url("https://wanandroid.com/wxarticle/list/408/1/json") //新增請求的url地址 .build() //返回一個Request物件

``` open class Builder { //請求地址 internal var url: HttpUrl? = null //請求方法 internal var method: String //請求頭 internal var headers: Headers.Builder //請求體 internal var body: RequestBody? = null

...

} ```

4.建立Call物件

val call = client.newCall(request)

override fun newCall(request: Request): Call { //newRealCall中傳遞了三個引數,第一個引數是OkHttpClient本身,第二個引數request, //第三個不用關注 return RealCall.newRealCall(this, request, forWebSocket = false) }

接著我們先來看一下RealCall是什麼

class RealCall( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { ... }

從原始碼可知RealCallCall的子類,那麼Call又是什麼呢,往下看

``` //呼叫是準備好要執行的請求。也可以取消呼叫。 //由於該物件表示單個請求/響應對(流),因此不能執行兩次。 interface Call : Cloneable { //返回發起此呼叫的原始請求。 fun request(): Request @Throws(IOException::class)

//同步請求,立即呼叫請求,並阻塞,直到響應可以處理或出現錯誤。
fun execute(): Response

//非同步請求,接受回撥引數
fun enqueue(responseCallback: Callback)

//取消請求
fun cancel()

//如果此呼叫已被執行或進入佇列,則返回true。多次執行呼叫是錯誤的。
fun isExecuted(): Boolean

//是否是取消狀態
fun isCanceled(): Boolean

//超時時間,
fun timeout(): Timeout

//建立一個與此呼叫相同的新呼叫,即使該呼叫已經進入佇列或執行,該呼叫也可以被加入佇列或執行。
public override fun clone(): Call

fun interface Factory {
    fun newCall(request: Request): Call
}

} ```

5.發起請求

  • 以非同步請求為例進行分析

``` call.enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { println("onFailure:$e") }

override fun onResponse(call: Call, response: Response) {
    println("onResponse:${response.body.toString()}")
}

}) ```

RealCallCall的子類所以enqueue的具體實現是在RealCall

``` override fun enqueue(responseCallback: Callback) { //檢查是否進行了二次請求 check(executed.compareAndSet(false, true)) { "Already Executed" }

//請求後立即呼叫,相當於監聽請求的開始事件
callStart()
//將請求交給排程器來決定什麼時候開始請求
client.dispatcher.enqueue(AsyncCall(responseCallback))

}

private fun callStart() { this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()") eventListener.callStart(this) } ```

  • 疑問:client.dispatcher.enqueue是如何決定什麼時候開始請求的
    • 已知client就是OkHttpClient
    • dispatcher是排程器,先來看一下它的原始碼

``` class Dispatcher constructor() { //併發執行的最大請求數。上面的請求佇列在記憶體中,等待正在執行的呼叫完成。 //如果在呼叫這個函式時有超過maxRequests的請求在執行,那麼這些請求將保持在執行狀態。 @get:Synchronized var maxRequests = 64 set(maxRequests) { require(maxRequests >= 1) { "max < 1: $maxRequests" } synchronized(this) { field = maxRequests } promoteAndExecute() }

//每臺主機可併發執行的最大請求數。這限制了URL主機名的請求。
//注意,對單個IP地址的併發請求仍然可能超過此限制:
//多個主機名可能共享一個IP地址或通過同一個HTTP代理路由。
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
    require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
    synchronized(this) {
        field = maxRequestsPerHost
    }
    promoteAndExecute()
}

//執行緒安全的單例模式,執行緒池的獲取用於執行緒排程。
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
    if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                   SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
}

//定義準備傳送的佇列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()

//定義非同步傳送佇列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()

//定義同步傳送佇列
private val runningSyncCalls = ArrayDeque<RealCall>()

...

} ```

    • 再來看一下dispatcher.enqueue的原始碼

``` internal fun enqueue(call: AsyncCall) { synchronized(this) { //將call物件新增到準備傳送的佇列,這個call物件來自AsyncCall,稍後再講 readyAsyncCalls.add(call)

    //修改AsyncCall,使其共享對同一主機的現有執行呼叫的AtomicInteger。
    if (!call.get().forWebSocket) {
        //共享一個已經存在的正在執行呼叫的AtomicInteger
        val existingCall = findExistingCallWithHost(call.host())
        //統計傳送數量
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
}
//準備傳送請求
promoteAndExecute()

} ```

``` //將符合條件的call從readyAsyncCalls(準備傳送的佇列)新增到runningAsyncCalls(非同步傳送佇列)中 //並在伺服器上執行它們 //不能在同步時呼叫,因為執行呼叫可以呼叫使用者程式碼 //如果排程程式當前正在執行,則為true。 private fun promoteAndExecute(): Boolean { assert(!Thread.holdsLock(this))

//收集所有需要執行的請求
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
    val i = readyAsyncCalls.iterator()
    //遍歷準備傳送的佇列
    while (i.hasNext()) {
        val asyncCall = i.next()

        //判斷已經發送的請求是大於等於最大請求個數64,是則跳出迴圈
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        //判斷併發請求個數是否大於等於最大併發個數5,如果是則跳出迴圈
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

        //從準備佇列中刪除
        i.remove()
        //計數+1
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        //將物件新增到非同步傳送佇列中
        runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
}

for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    //提交任務到執行緒池
    asyncCall.executeOn(executorService)
}

return isRunning

} ```

    • 最終請求是通過AsyncCallexecuteOn傳送出去的,AsyncCall是什麼

internal inner class AsyncCall( private val responseCallback: Callback ) : Runnable { ... }

    • 接收了一個回撥,繼承了Runnable

``` fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock()

//暫時定義為未執行成功
var success = false
try {
    //使用執行緒執行自身
    executorService.execute(this)
    //執行成功
    success = true
} catch (e: RejectedExecutionException) {
    val ioException = InterruptedIOException("executor rejected")
    ioException.initCause(e)
    noMoreExchanges(ioException)
    //傳送失敗
    responseCallback.onFailure(this@RealCall, ioException)
} finally {
    if (!success) {
        //傳送結束
        client.dispatcher.finished(this) // 停止執行
    }
}

} ```

    • AsyncCall將自己加入到執行緒池,然後執行緒池開啟執行緒執行自己的run方法,那麼AsyncCall加入了一個怎樣的執行緒池呢?

@get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! }

    • 這裡定義了一個快取執行緒池,具有快取功能且如果一個請求執行完畢後在60s內再次發起就會複用剛才那個執行緒,提高了效能。
    • 交給執行緒池後,執行緒池會開啟執行緒執行AsyncCall的run方法

override fun run() { threadName("OkHttp ${redactedUrl()}") { //定義響應標誌位,用於表示請求是否成功 var signalledCallback = false timeout.enter() try { //傳送請求並得到結果 val response = getResponseWithInterceptorChain() //過程未出錯 signalledCallback = true //回撥結果 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // 不要兩次發出回撥訊號! Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e) } else { //失敗回撥 responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { client.dispatcher.finished(this) } } }

在請求得到結果後最後會呼叫finish表示完成,這裡的finish又做了什麼呢?

``` /* * 由[AsyncCall.run]用於表示完成。 / internal fun finished(call: AsyncCall) { call.callsPerHost.decrementAndGet() finished(runningAsyncCalls, call) }

/* * 由[Call.execute]用於表示完成。 / internal fun finished(call: RealCall) { finished(runningSyncCalls, call) }

private fun finished(calls: Deque, call: T) { val idleCallback: Runnable? synchronized(this) { //將完成的任務從佇列中刪除 if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback }

//用於將等待佇列中的請求移入非同步佇列,並交由執行緒池執行
val isRunning = promoteAndExecute()

//如果沒有請求需要執行,回撥閒置callback
if (!isRunning && idleCallback != null) {
    idleCallback.run()
}

} ```

  • 流程圖如下