OkHttp源码分析(一)

语言: CN / TW / HK

theme: cyanosis


开启掘金成长之旅!这是我参与「掘金日新计划 · 12 月更文挑战」的第37天,点击查看活动详情

文章中源码的OkHttp版本为4.10.0

implementation 'com.squareup.okhttp3:okhttp:4.10.0'

1.简单使用

  • okHttp的简单使用代码如下:

``` //创建OkHttpClient对象 val client = OkHttpClient().newBuilder().build()

//创建Request对象 val request = Request.Builder() .url("https://wanandroid.com/wxarticle/list/408/1/json") //添加请求的url地址 .build() //返回一个Request对象

//发起请求 fun request() { val response = client .newCall(request) //创建一个Call对象 .enqueue(object : Callback { //调用enqueue方法执行异步请求 override fun onFailure(call: Call, e: IOException) { TODO("Not yet implemented") }

        override fun onResponse(call: Call, response: Response) {
            TODO("Not yet implemented")
        }
    })
}

```

  • 工作流程有四步:
    • 创建OkHttpClient对象
    • 创建Request对象
    • 创建Call对象
    • 开始发起请求,enqueue为异步请求,execute为同步请求

2.OkHttpClient对象是如何创建的

val client = OkHttpClient().newBuilder().build()

OkHttpClient对象的创建是一个典型的建造者模式,先看一下newBuilder方法做了什么,源码如下:

//创建了一个Builder对象 open fun newBuilder(): Builder = Builder(this)

``` class Builder constructor() { //调度器 internal var dispatcher: Dispatcher = Dispatcher() //拦截器集合 internal val interceptors: MutableList = mutableListOf() //网络拦截器集合 internal val networkInterceptors: MutableList = mutableListOf()

...

} ```

newBuilder中创建了一个Builder对象,Builder对象的构造函数中定义了很多的变量,这里只保留了3个重要的。

下面看一下build方法做了什么

//这个this就是上面创建的Builder对象 fun build(): OkHttpClient = OkHttpClient(this)

okHttpClient源码如下

``` open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory {

@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher

@get:JvmName("interceptors") val interceptors: List = builder.interceptors.toImmutableList()

@get:JvmName("networkInterceptors") val networkInterceptors: List = builder.networkInterceptors.toImmutableList()

...

} ```

通过OkHttpClient对象的源码可以得知,Builder创建的调度器、拦截器最终都会交给OkHttpClient,这是建造者模式的特定。

3.Request对象是如何创建的

val request = Request.Builder() .url("https://wanandroid.com/wxarticle/list/408/1/json") //添加请求的url地址 .build() //返回一个Request对象

``` open class Builder { //请求地址 internal var url: HttpUrl? = null //请求方法 internal var method: String //请求头 internal var headers: Headers.Builder //请求体 internal var body: RequestBody? = null

...

} ```

4.创建Call对象

val call = client.newCall(request)

override fun newCall(request: Request): Call { //newRealCall中传递了三个参数,第一个参数是OkHttpClient本身,第二个参数request, //第三个不用关注 return RealCall.newRealCall(this, request, forWebSocket = false) }

接着我们先来看一下RealCall是什么

class RealCall( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { ... }

从源码可知RealCallCall的子类,那么Call又是什么呢,往下看

``` //调用是准备好要执行的请求。也可以取消调用。 //由于该对象表示单个请求/响应对(流),因此不能执行两次。 interface Call : Cloneable { //返回发起此调用的原始请求。 fun request(): Request @Throws(IOException::class)

//同步请求,立即调用请求,并阻塞,直到响应可以处理或出现错误。
fun execute(): Response

//异步请求,接受回调参数
fun enqueue(responseCallback: Callback)

//取消请求
fun cancel()

//如果此调用已被执行或进入队列,则返回true。多次执行调用是错误的。
fun isExecuted(): Boolean

//是否是取消状态
fun isCanceled(): Boolean

//超时时间,
fun timeout(): Timeout

//创建一个与此调用相同的新调用,即使该调用已经进入队列或执行,该调用也可以被加入队列或执行。
public override fun clone(): Call

fun interface Factory {
    fun newCall(request: Request): Call
}

} ```

5.发起请求

  • 以异步请求为例进行分析

``` call.enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { println("onFailure:$e") }

override fun onResponse(call: Call, response: Response) {
    println("onResponse:${response.body.toString()}")
}

}) ```

RealCallCall的子类所以enqueue的具体实现是在RealCall

``` override fun enqueue(responseCallback: Callback) { //检查是否进行了二次请求 check(executed.compareAndSet(false, true)) { "Already Executed" }

//请求后立即调用,相当于监听请求的开始事件
callStart()
//将请求交给调度器来决定什么时候开始请求
client.dispatcher.enqueue(AsyncCall(responseCallback))

}

private fun callStart() { this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()") eventListener.callStart(this) } ```

  • 疑问:client.dispatcher.enqueue是如何决定什么时候开始请求的
    • 已知client就是OkHttpClient
    • dispatcher是调度器,先来看一下它的源码

``` class Dispatcher constructor() { //并发执行的最大请求数。上面的请求队列在内存中,等待正在运行的调用完成。 //如果在调用这个函数时有超过maxRequests的请求在运行,那么这些请求将保持在运行状态。 @get:Synchronized var maxRequests = 64 set(maxRequests) { require(maxRequests >= 1) { "max < 1: $maxRequests" } synchronized(this) { field = maxRequests } promoteAndExecute() }

//每台主机可并发执行的最大请求数。这限制了URL主机名的请求。
//注意,对单个IP地址的并发请求仍然可能超过此限制:
//多个主机名可能共享一个IP地址或通过同一个HTTP代理路由。
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
    require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
    synchronized(this) {
        field = maxRequestsPerHost
    }
    promoteAndExecute()
}

//线程安全的单例模式,线程池的获取用于线程调度。
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
    if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                   SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
}

//定义准备发送的队列
private val readyAsyncCalls = ArrayDeque<AsyncCall>()

//定义异步发送队列
private val runningAsyncCalls = ArrayDeque<AsyncCall>()

//定义同步发送队列
private val runningSyncCalls = ArrayDeque<RealCall>()

...

} ```

    • 再来看一下dispatcher.enqueue的源码

``` internal fun enqueue(call: AsyncCall) { synchronized(this) { //将call对象添加到准备发送的队列,这个call对象来自AsyncCall,稍后再讲 readyAsyncCalls.add(call)

    //修改AsyncCall,使其共享对同一主机的现有运行调用的AtomicInteger。
    if (!call.get().forWebSocket) {
        //共享一个已经存在的正在运行调用的AtomicInteger
        val existingCall = findExistingCallWithHost(call.host())
        //统计发送数量
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
}
//准备发送请求
promoteAndExecute()

} ```

``` //将符合条件的call从readyAsyncCalls(准备发送的队列)添加到runningAsyncCalls(异步发送队列)中 //并在服务器上执行它们 //不能在同步时调用,因为执行调用可以调用用户代码 //如果调度程序当前正在运行,则为true。 private fun promoteAndExecute(): Boolean { assert(!Thread.holdsLock(this))

//收集所有需要执行的请求
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
    val i = readyAsyncCalls.iterator()
    //遍历准备发送的队列
    while (i.hasNext()) {
        val asyncCall = i.next()

        //判断已经发送的请求是大于等于最大请求个数64,是则跳出循环
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        //判断并发请求个数是否大于等于最大并发个数5,如果是则跳出循环
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

        //从准备队列中删除
        i.remove()
        //计数+1
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        //将对象添加到异步发送队列中
        runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
}

for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    //提交任务到线程池
    asyncCall.executeOn(executorService)
}

return isRunning

} ```

    • 最终请求是通过AsyncCallexecuteOn发送出去的,AsyncCall是什么

internal inner class AsyncCall( private val responseCallback: Callback ) : Runnable { ... }

    • 接收了一个回调,继承了Runnable

``` fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock()

//暂时定义为未执行成功
var success = false
try {
    //使用线程执行自身
    executorService.execute(this)
    //执行成功
    success = true
} catch (e: RejectedExecutionException) {
    val ioException = InterruptedIOException("executor rejected")
    ioException.initCause(e)
    noMoreExchanges(ioException)
    //发送失败
    responseCallback.onFailure(this@RealCall, ioException)
} finally {
    if (!success) {
        //发送结束
        client.dispatcher.finished(this) // 停止运行
    }
}

} ```

    • AsyncCall将自己加入到线程池,然后线程池开启线程执行自己的run方法,那么AsyncCall加入了一个怎样的线程池呢?

@get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! }

    • 这里定义了一个缓存线程池,具有缓存功能且如果一个请求执行完毕后在60s内再次发起就会复用刚才那个线程,提高了性能。
    • 交给线程池后,线程池会开启线程执行AsyncCall的run方法

override fun run() { threadName("OkHttp ${redactedUrl()}") { //定义响应标志位,用于表示请求是否成功 var signalledCallback = false timeout.enter() try { //发送请求并得到结果 val response = getResponseWithInterceptorChain() //过程未出错 signalledCallback = true //回调结果 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // 不要两次发出回调信号! Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e) } else { //失败回调 responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { client.dispatcher.finished(this) } } }

在请求得到结果后最后会调用finish表示完成,这里的finish又做了什么呢?

``` /* * 由[AsyncCall.run]用于表示完成。 / internal fun finished(call: AsyncCall) { call.callsPerHost.decrementAndGet() finished(runningAsyncCalls, call) }

/* * 由[Call.execute]用于表示完成。 / internal fun finished(call: RealCall) { finished(runningSyncCalls, call) }

private fun finished(calls: Deque, call: T) { val idleCallback: Runnable? synchronized(this) { //将完成的任务从队列中删除 if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback }

//用于将等待队列中的请求移入异步队列,并交由线程池执行
val isRunning = promoteAndExecute()

//如果没有请求需要执行,回调闲置callback
if (!isRunning && idleCallback != null) {
    idleCallback.run()
}

} ```

  • 流程图如下