OkHttp原始碼分析(一)
theme: cyanosis
開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第37天,點選檢視活動詳情
文章中原始碼的OkHttp版本為4.10.0
implementation 'com.squareup.okhttp3:okhttp:4.10.0'
1.簡單使用
- okHttp的簡單使用程式碼如下:
``` //建立OkHttpClient物件 val client = OkHttpClient().newBuilder().build()
//建立Request物件 val request = Request.Builder() .url("https://wanandroid.com/wxarticle/list/408/1/json") //新增請求的url地址 .build() //返回一個Request物件
//發起請求 fun request() { val response = client .newCall(request) //建立一個Call物件 .enqueue(object : Callback { //呼叫enqueue方法執行非同步請求 override fun onFailure(call: Call, e: IOException) { TODO("Not yet implemented") }
override fun onResponse(call: Call, response: Response) {
TODO("Not yet implemented")
}
})
}
```
- 工作流程有四步:
- 建立OkHttpClient物件
- 建立Request物件
- 建立Call物件
- 開始發起請求,
enqueue
為非同步請求,execute
為同步請求
2.OkHttpClient物件是如何建立的
val client = OkHttpClient().newBuilder().build()
OkHttpClient
物件的建立是一個典型的建造者模式,先看一下newBuilder
方法做了什麼,原始碼如下:
//建立了一個Builder物件
open fun newBuilder(): Builder = Builder(this)
```
class Builder constructor() {
//排程器
internal var dispatcher: Dispatcher = Dispatcher()
//攔截器集合
internal val interceptors: MutableList
...
} ```
newBuilder
中建立了一個Builder
物件,Builder
物件的建構函式中定義了很多的變數,這裡只保留了3個重要的。
下面看一下build
方法做了什麼
//這個this就是上面建立的Builder物件
fun build(): OkHttpClient = OkHttpClient(this)
okHttpClient
原始碼如下
``` open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory {
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
@get:JvmName("interceptors") val interceptors: List
@get:JvmName("networkInterceptors") val networkInterceptors: List
...
} ```
通過OkHttpClient
物件的原始碼可以得知,Builder
建立的排程器、攔截器最終都會交給OkHttpClient
,這是建造者模式的特定。
3.Request物件是如何建立的
val request = Request.Builder()
.url("https://wanandroid.com/wxarticle/list/408/1/json") //新增請求的url地址
.build() //返回一個Request物件
``` open class Builder { //請求地址 internal var url: HttpUrl? = null //請求方法 internal var method: String //請求頭 internal var headers: Headers.Builder //請求體 internal var body: RequestBody? = null
...
} ```
4.建立Call物件
val call = client.newCall(request)
override fun newCall(request: Request): Call {
//newRealCall中傳遞了三個引數,第一個引數是OkHttpClient本身,第二個引數request,
//第三個不用關注
return RealCall.newRealCall(this, request, forWebSocket = false)
}
接著我們先來看一下RealCall
是什麼
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
...
}
從原始碼可知RealCall
是Call
的子類,那麼Call
又是什麼呢,往下看
``` //呼叫是準備好要執行的請求。也可以取消呼叫。 //由於該物件表示單個請求/響應對(流),因此不能執行兩次。 interface Call : Cloneable { //返回發起此呼叫的原始請求。 fun request(): Request @Throws(IOException::class)
//同步請求,立即呼叫請求,並阻塞,直到響應可以處理或出現錯誤。
fun execute(): Response
//非同步請求,接受回撥引數
fun enqueue(responseCallback: Callback)
//取消請求
fun cancel()
//如果此呼叫已被執行或進入佇列,則返回true。多次執行呼叫是錯誤的。
fun isExecuted(): Boolean
//是否是取消狀態
fun isCanceled(): Boolean
//超時時間,
fun timeout(): Timeout
//建立一個與此呼叫相同的新呼叫,即使該呼叫已經進入佇列或執行,該呼叫也可以被加入佇列或執行。
public override fun clone(): Call
fun interface Factory {
fun newCall(request: Request): Call
}
} ```
5.發起請求
- 以非同步請求為例進行分析
``` call.enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { println("onFailure:$e") }
override fun onResponse(call: Call, response: Response) {
println("onResponse:${response.body.toString()}")
}
}) ```
RealCall
是Call
的子類所以enqueue
的具體實現是在RealCall
中
``` override fun enqueue(responseCallback: Callback) { //檢查是否進行了二次請求 check(executed.compareAndSet(false, true)) { "Already Executed" }
//請求後立即呼叫,相當於監聽請求的開始事件
callStart()
//將請求交給排程器來決定什麼時候開始請求
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
private fun callStart() { this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()") eventListener.callStart(this) } ```
- 疑問:
client.dispatcher.enqueue
是如何決定什麼時候開始請求的
-
- 已知
client
就是OkHttpClient
- 已知
-
dispatcher
是排程器,先來看一下它的原始碼
``` class Dispatcher constructor() { //併發執行的最大請求數。上面的請求佇列在記憶體中,等待正在執行的呼叫完成。 //如果在呼叫這個函式時有超過maxRequests的請求在執行,那麼這些請求將保持在執行狀態。 @get:Synchronized var maxRequests = 64 set(maxRequests) { require(maxRequests >= 1) { "max < 1: $maxRequests" } synchronized(this) { field = maxRequests } promoteAndExecute() }
//每臺主機可併發執行的最大請求數。這限制了URL主機名的請求。
//注意,對單個IP地址的併發請求仍然可能超過此限制:
//多個主機名可能共享一個IP地址或通過同一個HTTP代理路由。
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
//執行緒安全的單例模式,執行緒池的獲取用於執行緒排程。
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
//定義準備傳送的佇列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
//定義非同步傳送佇列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
//定義同步傳送佇列
private val runningSyncCalls = ArrayDeque<RealCall>()
...
} ```
-
- 再來看一下
dispatcher.enqueue
的原始碼
- 再來看一下
``` internal fun enqueue(call: AsyncCall) { synchronized(this) { //將call物件新增到準備傳送的佇列,這個call物件來自AsyncCall,稍後再講 readyAsyncCalls.add(call)
//修改AsyncCall,使其共享對同一主機的現有執行呼叫的AtomicInteger。
if (!call.get().forWebSocket) {
//共享一個已經存在的正在執行呼叫的AtomicInteger
val existingCall = findExistingCallWithHost(call.host())
//統計傳送數量
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//準備傳送請求
promoteAndExecute()
} ```
``` //將符合條件的call從readyAsyncCalls(準備傳送的佇列)新增到runningAsyncCalls(非同步傳送佇列)中 //並在伺服器上執行它們 //不能在同步時呼叫,因為執行呼叫可以呼叫使用者程式碼 //如果排程程式當前正在執行,則為true。 private fun promoteAndExecute(): Boolean { assert(!Thread.holdsLock(this))
//收集所有需要執行的請求
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
//遍歷準備傳送的佇列
while (i.hasNext()) {
val asyncCall = i.next()
//判斷已經發送的請求是大於等於最大請求個數64,是則跳出迴圈
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//判斷併發請求個數是否大於等於最大併發個數5,如果是則跳出迴圈
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
//從準備佇列中刪除
i.remove()
//計數+1
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
//將物件新增到非同步傳送佇列中
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
//提交任務到執行緒池
asyncCall.executeOn(executorService)
}
return isRunning
} ```
-
- 最終請求是通過
AsyncCall
的executeOn
傳送出去的,AsyncCall
是什麼
- 最終請求是通過
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
}
-
- 接收了一個回撥,繼承了Runnable
``` fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock()
//暫時定義為未執行成功
var success = false
try {
//使用執行緒執行自身
executorService.execute(this)
//執行成功
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//傳送失敗
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//傳送結束
client.dispatcher.finished(this) // 停止執行
}
}
} ```
-
AsyncCall
將自己加入到執行緒池,然後執行緒池開啟執行緒執行自己的run方法,那麼AsyncCall
加入了一個怎樣的執行緒池呢?
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
-
- 這裡定義了一個快取執行緒池,具有快取功能且如果一個請求執行完畢後在60s內再次發起就會複用剛才那個執行緒,提高了效能。
- 交給執行緒池後,執行緒池會開啟執行緒執行AsyncCall的run方法
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
//定義響應標誌位,用於表示請求是否成功
var signalledCallback = false
timeout.enter()
try {
//傳送請求並得到結果
val response = getResponseWithInterceptorChain()
//過程未出錯
signalledCallback = true
//回撥結果
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// 不要兩次發出回撥訊號!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
//失敗回撥
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
在請求得到結果後最後會呼叫finish
表示完成,這裡的finish
又做了什麼呢?
``` /* * 由[AsyncCall.run]用於表示完成。 / internal fun finished(call: AsyncCall) { call.callsPerHost.decrementAndGet() finished(runningAsyncCalls, call) }
/* * 由[Call.execute]用於表示完成。 / internal fun finished(call: RealCall) { finished(runningSyncCalls, call) }
private fun
//用於將等待佇列中的請求移入非同步佇列,並交由執行緒池執行
val isRunning = promoteAndExecute()
//如果沒有請求需要執行,回撥閒置callback
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
} ```
- 流程圖如下