OkHttp原始碼和流程分析

語言: CN / TW / HK

前言

OKHttp是Square推出的一款網路請求框架,是目前市面上使用最多的網路框架之一。大名鼎鼎的Retrofit就是基於它設計的。而它曾經的底層IO元件,目前也是獨立成為了一款優秀的IO開源框架——Okio,該框架的原始碼分析詳見Okio原始碼和流程分析

OkHttp是一個高效的執行HTTP的客戶端,可以在節省頻寬的同時更加快速的載入網路內容:

  • 它支援HTTP/2 、允許連結到同一主機的請求公用一個Scoket
  • 如果 HTTP/2 不可用,通過連線池減少請求的延遲
  • 通過GZIP壓縮減少傳輸資料的大小
  • 通過快取避免了網路重複請求

OkHttp的簡單使用方法如下:

``` fun okHttpCall(name: String) { //建立OkHttpClient物件 val client = OkHttpClient() //建立Request請求物件 var request: Request = Request.Builder() .url("http://api.github.com/users/$name/repos") .method("GET", null) .build() //同步請求,獲取獲取Response物件 val response = client.newCall(request).execute() println(response.body()?.source()?.readUtf8())

////非同步請求,獲取獲取Response物件
client.newCall(request).enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        e.printStackTrace()
    }

    override fun onResponse(call: Call, response: Response) {
        println(response.body()?.source()?.readUtf8())
    }
})

} ``` OkHttp請求網路的過程也很簡介:建立OkHttpClient物件;建立Request請求物件;獲取Response物件。

大致可以猜測到:Request裡包含了我們的請求資訊,包括url、請求方法、引數等;OkHttpClient則負責傳送我們的請求資訊到目標伺服器,並接收返回資訊Response。

基本角色介紹

上文的示例程式碼中,我們展示了OkHttp的同步和非同步兩種請求。首先從最簡單的同步請求看起,精簡後流程如下:

fun synchronizeCall(name: String) { var request: Request = Request.Builder() .url("http://api.github.com/users/$name/repos") .method("GET", null) .build() val client = OkHttpClient() val response = client.newCall(request).execute() println(response.body()?.source()?.readUtf8()) }

流程分為四步:

  • 建立Request請求體;
  • 獲取OkHttpClient例項;
  • 傳送請求並得到Response;
  • 解析Response資料。

首先看一下Request的程式碼:

Request

``` public final class Request { //請求路徑 final HttpUrl url; //請求方法,如:GET、POST.. final String method; //請求頭 final Headers headers; //請求體 final @Nullable RequestBody body;

Request(Builder builder) { this.url = builder.url; this.method = builder.method; this.headers = builder.headers.build(); this.body = builder.body; this.tags = Util.immutableMap(builder.tags); }

public HttpUrl url() { return url; }

//.. public @Nullable String header(String name) { return headers.get(name); }

public List headers(String name) { return headers.values(name); }

public @Nullable RequestBody body() { return body; } //.. public Builder newBuilder() { return new Builder(this); }

public static class Builder { @Nullable HttpUrl url; String method; Headers.Builder headers; @Nullable RequestBody body;

public Builder() {
  this.method = "GET";
  this.headers = new Headers.Builder();
}

Builder(Request request) {
  this.url = request.url;
  this.method = request.method;
  this.body = request.body;
  this.tags = request.tags.isEmpty()
      ? Collections.<Class<?>, Object>emptyMap()
      : new LinkedHashMap<>(request.tags);
  this.headers = request.headers.newBuilder();
}

public Builder url(HttpUrl url) {
  if (url == null) throw new NullPointerException("url == null");
  this.url = url;
  return this;
}

} } ``` Request的邏輯和職責很明確——它包含了所有的請求資訊,是請求的載體,主要包括了請求的Url地址、請求方法、請求頭和請求體等資訊。採用了建造者模式,通過靜態內部類Builder來建立。

OkHttpClient

接下來看OkHttpClient,它也採用了建造者模式。

``` public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {

//排程器 final Dispatcher dispatcher; //攔截器 final List interceptors; final List networkInterceptors; // final EventListener.Factory eventListenerFactory; final ProxySelector proxySelector; //cookie設定 final CookieJar cookieJar; //快取設定 final @Nullable Cache cache; final @Nullable InternalCache internalCache; final SocketFactory socketFactory; final SSLSocketFactory sslSocketFactory; final CertificateChainCleaner certificateChainCleaner; final HostnameVerifier hostnameVerifier; final CertificatePinner certificatePinner; final Authenticator proxyAuthenticator; final Authenticator authenticator; //連線池 final ConnectionPool connectionPool; //DNS設定 final Dns dns; //是否從HTTP重定向到HTTPS final boolean followSslRedirects; final boolean followRedirects; final boolean retryOnConnectionFailure; //連線、讀寫等超時時間 final int callTimeout; final int connectTimeout; final int readTimeout; final int writeTimeout;

public OkHttpClient() { this(new Builder()); }

OkHttpClient(Builder builder) { this.dispatcher = builder.dispatcher; this.proxy = builder.proxy; this.protocols = builder.protocols; //.. this.connectTimeout = builder.connectTimeout; this.readTimeout = builder.readTimeout; this.writeTimeout = builder.writeTimeout; this.pingInterval = builder.pingInterval;

}

@Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false / for web socket /); } public static final class Builder { Dispatcher dispatcher; / * 引數基本和外部類保持一直 / int callTimeout; int connectTimeout; int readTimeout; int writeTimeout; int pingInterval;

public Builder() {
  dispatcher = new Dispatcher();
  protocols = DEFAULT_PROTOCOLS;
  connectionSpecs = DEFAULT_CONNECTION_SPECS;
  eventListenerFactory = EventListener.factory(EventListener.NONE);
  //..
  readTimeout = 10_000;
  writeTimeout = 10_000;
  pingInterval = 0;
}

Builder(OkHttpClient okHttpClient) {
  this.dispatcher = okHttpClient.dispatcher;
  //..
  this.pingInterval = okHttpClient.pingInterval;
}

} } ```

刪減過後的OkHttpClient程式碼如上。它有超多成員變數,主要包括:

  • 傳送請求相關:排程器、連線池和攔截器等;
  • Http基礎配置:快取、DNS、代理、重定向、超時等。

它和Request一樣,都是採用了建造者模式,並持有了大量的配置資訊。不同之處在於Request的配置是服務於一次特定的網路請求的。而OkHttpClient則是主要作用於Http的基礎配置,是針對“一批”請求的(當然,你也可以為每個請求都穿件一個OkHttpClient例項,並採用不同的配置)。類比到現實中具象的事物,可以把OkHttpClient理解為鐵路系統,而Request則是一列一列的車廂。鐵路系統負責告知車廂在那條線跑(執行緒排程),規定裝貨卸貨和運輸時間(讀寫和傳輸超時時間)等一些公共規則。但是拉什麼貨(head、body)去哪裡(url),用什麼車拉(方法),則完全交由列車(Request)車廂。當然,鐵路網合理的執行,離不開排程部門的統一排程。這些工作都是交由Dispatcher執行的。

Dispatcher

排程器Dispatcher由OkHttpClient持有,它負責非同步請求的執行策略。每個排程器內部都有一個ExecutorService用來執行Call。由此可預測,Dispatcher是通過執行緒池進行執行緒排程的。它的主要結構如下:

```

public final class Dispatcher { //同一時間允許併發執行網路請求數量 private int maxRequests = 64; //同一Host下的最大同時請求數 private int maxRequestsPerHost = 5; private @Nullable Runnable idleCallback;

//執行緒池 private @Nullable ExecutorService executorService;

//已經做好準備,等待發起的非同步請求佇列 private final Deque readyAsyncCalls = new ArrayDeque<>();

//正在執行的非同步請求佇列 private final Deque runningAsyncCalls = new ArrayDeque<>();

//正在執行的同步請求佇列 private final Deque runningSyncCalls = new ArrayDeque<>();

//獲取執行緒池,如果是第一次獲取executorService為空,那麼則先建立 public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } } ``` 排程器裡維持著幾個ArrayDeque雙端佇列,用來儲存不同的請求(同步/非同步/執行狀態)。並且會提供一個執行緒池。該執行緒池沒有核心執行緒,最大執行緒數Integer.MAX_VALUE可以理解為沒有上限。並採用SynchronousQueue作為阻塞佇列。

不僅如此,鐵路系統(OkHttpClient)還要給車廂提供運貨的動力——車頭(Call)。我們需要把裝滿貨物的車廂掛在車頭上,並命令車頭髮車才能開始物資運輸。

Call

我們需要通過client.newCall(request)獲取到Call之後再呼叫execute()方法才能執行一次同步請求。client.newCall方法如下:

@Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); }

該方法返回了一個RealCall例項,它實現Call介面。

public interface Call extends Cloneable { Request request(); //同步請求 Response execute() throws IOException; //非同步請求 void enqueue(Callback responseCallback); void cancel(); //請求是否已經執行 boolean isExecuted(); boolean isCanceled(); } 該介面定義了同步和非同步請求、請求是否正在執行和克隆等方法,這裡是真正執行請求的地方。它只有一個實現就是RealCall。接下來的工作就是排程器通知開車了,不過整個工作流程我們稍後會專門分析。這裡我們首先看一下Response

Response

無論是同步請求還是非同步請求,最終拿到的都是Response。它有一下成員變數:

``` public final class Response implements Closeable { final Request request; final Protocol protocol; final int code; final String message; final @Nullable Handshake handshake; final Headers headers; final @Nullable ResponseBody body; final @Nullable Response networkResponse; final @Nullable Response cacheResponse; final @Nullable Response priorResponse; final long sentRequestAtMillis; final long receivedResponseAtMillis;

private volatile @Nullable CacheControl cacheControl; // Lazily initialized. } `` 可見,Response包含著我們基本請求資訊Request,以及Http返回的頭部資訊、狀態碼code和message,以及我們需要的正文body。ResponseBody本身是一個從伺服器到我們客戶端的一個一次性流,其中的正文以位元組的形式儲存。在它的一個實現RealResponseBody中,位元組以快取的形式儲存在BufferedSource`。這裡就涉及到了Okio的知識,詳情可以看Okio原始碼和流程分析中的Buffer快取的讀寫操作

主流程分析

上文對OkHttp的各個角色進行了簡單的介紹。接下來我們開始對網路請求的主流程進行分析。重點在於執行緒的分配、請求的發起和請求結果響應。至於請求Request的建立,Client的建立就不深入分析了。

首先看同步請求,也就是RealCall.execute()方法:

@Override public Response execute() throws IOException { synchronized (this) { //判斷是否已經被執行了, 確保只能執行一次,如果已經執行過,則丟擲異常 if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); timeout.enter(); //開啟請求監聽 eventListener.callStart(this); try { //將當前的Call加入到排程器的runningSyncCalls佇列中 //表明當前請求正在進行中 client.dispatcher().executed(this); //發起請求並獲取返回結果 Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } catch (IOException e) { e = timeoutExit(e); eventListener.callFailed(this, e); throw e; } finally { client.dispatcher().finished(this); } }

程式碼邏輯已經解除安裝註釋裡了。Dispatcher只是將Call 新增到runningSyncCalls中並在請求結束後將它移除。

我們暫時不分析getResponseWithInterceptorChain()裡的邏輯,因為非同步操作最終也是在這裡執行的,稍後我們會重點對它進行分析。先看一下非同步請求的流程.。也就是RealCall.enqueue()方法:

@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); //開啟請求監聽 eventListener.callStart(this); //建立AsyncCall物件,通過排程器enqueue方法加入到readyAsyncCalls佇列中 client.dispatcher().enqueue(new AsyncCall(responseCallback)); } 該方法接受一個Callback物件,該物件通常以匿名內部類的方式建立,它有兩個方法onResponseonFailure分別對應請求成功和失敗。方法首先判斷請求是否已經執行了,並將Call的狀態標記為已經執行。緊接著開啟請求監聽。這裡的操作和同步請求裡的基本一致。接下來,就和同步請求裡的不同了——建立一個AsyncCall的例項,通過OkHttpClient執行排程器的enqueue方法。接下來就是執行緒排程和請求的執行流程了。

執行緒的排程

Dispatcher.enqueue的程式碼和流程如下:

``` void enqueue(AsyncCall call) { synchronized (this) { readyAsyncCalls.add(call); } promoteAndExecute(); }

private boolean promoteAndExecute() { assert (!Thread.holdsLock(this));

List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall asyncCall = i.next();

    if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
    if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

    i.remove();
    executableCalls.add(asyncCall);
    runningAsyncCalls.add(asyncCall);
}
    isRunning = runningCallsCount() > 0;
}

for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());
}
return isRunning;

} ```

Dispatcher.enqueue方法的邏輯就是將將AsyncCall加入到readyAsyncCalls佇列中,表明AsyncCall已經準備好要執行了,然後呼叫promoteAndExecute方法,promoteAndExecute方法會遍歷readyAsyncCalls中所有待執行的AsyncCall。在遍歷的同時,會檢查當前需要發起請求的數量(64)和當前指向同一Host的請求數(5),r如果條件滿足,則將Call依次取出來。緊接著就是執行AsyncCall.executeOn方法了。

注:這裡可能有一個疑問。為什麼要多次一舉藉助readyAsyncCalls佇列,而且為了確保執行緒安全,還要對它加鎖。目的是為了方便統計當前所有正在執行的請求總數以及統一Host的請求數量以及能夠取消所有請求。

接下來的流程又要回到AsyncCall中了,AsyncCall是RealCall的一個內部類。每次非同步請求時都會降一個AsyncCall例項交給Dispatcher排程器。排程器在對它進行排程後,在通過AsyncCall.executeOn方法執行這個AsyncCall的時候,會通過executorService()傳給它一個執行緒池例項(該方法在Dispatcher介紹中已經講過了)。

AsyncCall.executeOn原始碼和流程如下:

``` final class AsyncCall extends NamedRunnable { //請求解僱回撥 private final Callback responseCallback;

AsyncCall(Callback responseCallback) {
    super("OkHttp %s", redactedUrl());
    this.responseCallback = responseCallback;
}

//開始執行,將任務加入到執行緒池中
void executeOn(ExecutorService executorService) {
    assert (!Thread.holdsLock(client.dispatcher()));
    boolean success = false;
    try {
        executorService.execute(this);
        success = true;
    } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        eventListener.callFailed(RealCall.this, ioException);
        responseCallback.onFailure(RealCall.this, ioException);
    } finally {
        if (!success) {
            client.dispatcher().finished(this); // This call is no longer running!
        }
    }
}

//任務執行的地方
@Override protected void execute() {
    boolean signalledCallback = false;
    timeout.enter();
    try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
            signalledCallback = true;
            responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
            signalledCallback = true;
            responseCallback.onResponse(RealCall.this, response);
        }
    } catch (IOException e) {
        e = timeoutExit(e);
        if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
            eventListener.callFailed(RealCall.this, e);
            responseCallback.onFailure(RealCall.this, e);
        }
    } finally {
        client.dispatcher().finished(this);
    }
}

} ``AsyncCall的父類NamedRunnable是一個抽象類,它實現了Runnable`介面,並重寫了run方法。在run方法中呼叫了execute方法:

``` public abstract class NamedRunnable implements Runnable { protected final String name;

public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); }

@Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } }

protected abstract void execute(); } ```

這是一個典型的執行緒池操作——AsyncCall本身就是一個Runable,它的executeOn方法講自己作為任務提交給Dispatcher排程器提供的執行緒池中。一旦執行緒池開始執行AsyncCall這個Runable任務時,它重寫的run方法就會執行AsyncCall. execute()方法,而execute()中和上文中的同步執行請求一樣,都是藉助getResponseWithInterceptorChain來執行的。

總結一下:同步請求和非同步請求本質上都是藉助RealCall,並由排程器管理,最終通過getResponseWithInterceptorChain來發起。而非同步請求多了一步執行緒的排程:藉助RealCall裡實現了Runable介面的內部類AsyncCall,將任務新增到排程器Dispatcher提供的執行緒池裡,從而將getResponseWithInterceptorChain放到執行緒池中執行實現執行緒的切換。到此,整體流程如下:

責任鏈模式和攔截器

OkHttp真正的核心是它的攔截器。Interceptor可以說是OkHttp最重要的東西了,它不僅負責OkHttp的核心功能。而且還提供了一些使用者自定義的功能。相信你一定使用過日誌攔截器HttpLoggingInterceptor,使用如下程式碼就可以為OkHttp新增日誌攔截器val client = OkHttpClient().newBuilder().addInterceptor(HttpLoggingInterceptor()).build()。 你也可以自己實現Interceptor介面自定義攔截器,例如自定義一個列印請求資訊的攔截器:

class RequestInterceptor : Interceptor { override fun intercept(chain: Interceptor.Chain): Response { val request = chain.request() Log.e("RequestInterceptor","-----------${Thread.currentThread().name}") Log.e("Request-Method","-----------${request.method()}") Log.e("Request-Host","-----------${request.url()}") for (headName in request.headers().names()){ Log.e("Request-Head:$headName","-----------${request.header(headName)}") } Log.e("Request-Body","-----------${request.body()}") return chain.proceed(request) } }

只需要重寫intercept方法,並通過addInterceptor將攔截器新增到OkHttpClient中即可。攔截器不僅可以提供給使用者在網路請求發起前做一些統一的事情,例如列印資訊,新增cookie,日誌記錄、請求攔截等。整個網路請求都是藉助攔截器進行的,OkHttp藉助攔截器,通過責任鏈模式巧妙的將網路請求的各個任務拆分開來,每個Interceptor只負責自己關心的操作,它的定義如下:

``` public interface Interceptor { Response intercept(Chain chain) throws IOException;

interface Chain {

}
}

```

Interceptor是一個介面,只定義了一個方法intercept(Chain chain)和一個內部介面。

現在看一下上文中提到過的getResponseWithInterceptorChain方法:

``` Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List interceptors = new ArrayList<>(); //註釋一 interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); //註釋二 if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
    originalRequest, this, eventListener, client.connectTimeoutMillis(),
    client.readTimeoutMillis(), client.writeTimeoutMillis());

return chain.proceed(originalRequest);

} ```

程式碼邏輯主要邏輯是將所有的Interceptor作為一個集合,並建立一個RealInterceptorChain物件,然後執行它的proceed方法。其中,註釋一處的client.interceptors()就是我們通過addInterceptor新增的攔截器,而client.networkInterceptors()則是通過addNetworkInterceptor方法新增的。它們的區別我們稍後再講。首先看一下這些攔截器都是幹什麼的:

  • 自定義攔截器(應用攔截器):提供給使用者的定製的攔截器。
  • 失敗和重試攔截器(RetryAndFollowUpInterceptor):負責請求失敗的重試工作與重定向,同時它會對連線做一些初始化工作。
  • 橋接攔截器(BridgeInterceptor):主要用來構造請求,客戶端與伺服器之間的溝通橋樑,負責將使用者構建的請求轉換為伺服器需要的請求。
  • 快取攔截器(CacheInterceptor):通過OkHttpClient.cache來配置快取,快取攔截器通過CacheStrategy來判斷是使用網路還是快取來構建response。
  • 連線攔截器(ConnectInterceptor):負責客戶端與伺服器真正建立起連線。
  • 網路攔截器 (networkInterceptors):和interceptors一樣也是由使用者自定義的,它們的不同源自它們的位置不同。應用攔截器處在攔截器的首要位置,每次請求都必定會執行,而且只會執行一次。而網路攔截位置比較靠後,它可能因為異常而不會執行,同時,也可能由於RetryAndFollowUpInterceptor不斷重試,導致執行多次。
  • 網路請求攔截器(CallServerInterceptor):負責發起網路請求解析網路返回的資料

接下來分析RealInterceptorChain是怎麼處理Interceptor的。RealInterceptorChain實現了Interceptor.Chain介面,該介面定義在Interceptor介面中,RealInterceptorChain刪減後,串聯起所有攔截器的關鍵程式碼如下:

``` public final class RealInterceptorChain implements Interceptor.Chain { private final List interceptors; private final StreamAllocation streamAllocation; private final RealConnection connection; private final int index;

//構造方法 public RealInterceptorChain(List interceptors, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call, EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) { this.interceptors = interceptors; this.connection = connection; this.streamAllocation = streamAllocation; this.index = index; this.request = request; }

@Override public Response proceed(Request request) throws IOException { return proceed(request, streamAllocation, httpCodec, connection); }

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { //計算index角標是否小於interceptors的 if (index >= interceptors.size()) throw new AssertionError();

calls++;

//建立一個新的RealInterceptorChain例項,引數基本不變
//但是index和call都自增傳入新的值(+1)
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
    connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
    writeTimeout);
//根據角標回去到攔截器,然後執行攔截器的intercept方法
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

return response;

} } ```

攔截器的執行時靠著RealInterceptorChain.proceed方法推動的。建立RealInterceptorChain例項,並呼叫它的proceed方法(始於getResponseWithInterceptorChain),proceed會根據一個累加的角標index獲取到interceptors集合中對應的攔截器,同時會建立一個新的RealInterceptorChain,並將它作為實參執行攔截器的Interceptor.intercept方法。而Interceptor.intercept返回值型別為Response,需要通過RealInterceptorChain.proceed方法獲取。以此實現對新建立的RealInterceptorChainproceed的呼叫,從而實現對攔截器的鏈式呼叫。整體的呼叫時序圖大致如下(省略掉使用者自定義的攔截器):

接下來看一下各個攔截器的功能,攔截器的核心程式碼都在Interceptor.intercept中,所以接下來的重點就是各個攔截器的Interceptor.intercept方法。首先從RetryAndFollowUpInterceptor入手:

RetryAndFollowUpInterceptor

從它的名字可以得知:RetryAndFollowUpInterceptor負責請求失敗的重試工作與重定向的後續請求工作。其程式碼如下(完整程式碼):

``` @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; Call call = realChain.call(); EventListener eventListener = realChain.eventListener();

//第①步
//建立StreamAllocation例項
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
    createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

int followUpCount = 0;
Response priorResponse = null;
while (true) {
    if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
    }

    Response response;
    boolean releaseConnection = true;
    try {
        //第②步
        //通過Chain呼叫下一個攔截器
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
    } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
            throw e.getFirstConnectException();
        }
        releaseConnection = false;
        continue;
    } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
    } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
            streamAllocation.streamFailed(null);
            streamAllocation.release();
        }
    }

    //..

    Request followUp;
    try {
        //第③步
        //判斷是否需要重新
        followUp = followUpRequest(response, streamAllocation.route());
    } catch (IOException e) {
        streamAllocation.release();
        throw e;
    }

    if (followUp == null) {
        //第④步
        //無需重新請求,釋放資源並返回response
        streamAllocation.release();
        return response;
    }


    //記錄重定向次數,大於21次後丟擲異常
    if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
    }

    if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
    }

    if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
    } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
                + " didn't close its backing stream. Bad interceptor?");
    }
    //重新設定請求
    request = followUp;
    priorResponse = response;
}

} ```

RetryAndFollowUpInterceptor通過開啟一個while迴圈實現請求重試的功能,當滿足一下條件時會繼續迴圈:

  • 請求內部丟擲異常時(後面的攔截器發生異常),判定是否需要重試(第②步的try異常捕獲)
  • 根據響應結果的返回碼,判斷是否需要重新構建新請求併發送(第③步的返回結果判斷)

並用MAX_FOLLOW_UPS限制了重試的次數。注意上面程式碼中第二步後所有的程式碼都要等所有攔截器都執行完並返回結果或者丟擲異常才能夠被執行。需要注意的是OkHttpClien的retryOnConnectionFailure引數設定為false或者請求的body已經發送出去了,則不會重試。

注意這裡就是Interceptors和NetworkInterceptors的區別的根源所在:Interceptors在RetryAndFollowUpInterceptor前面,而NetworkInterceptors在RetryAndFollowUpInterceptor的後面。Interceptors處在所有攔截器的前面,裡面的攔截器肯定會執行,而且只會執行一次。但是就不同了,它可能不會執行(在它之前的攔截器發生了異常,請求終止了),也可能被執行多次(RetryAndFollowUpInterceptor重試或者重定向了多次)。

另外,這裡需要注意RetryAndFollowUpInterceptor不僅負責重試和重定向,它還建立了StreamAllocation例項,並通過RealInterceptorChain.proceed()方法將它傳遞給後續的攔截器和RealInterceptorChain。也正是從RetryAndFollowUpInterceptor攔截器,RealInterceptorChain裡的成員變數streamAllocation才開始不為空。StreamAllocation內維持著連線池,負責建立管理連線,稍後我們會專門有一節對它進行分析。

BridgeInterceptor

BridgeInterceptor(完整程式碼)是客戶端與伺服器之間的溝通橋樑,負責將使用者構建的請求轉換為伺服器需要的請求:

``` @Override public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder();

//設定請求頭的各種引數
RequestBody body = userRequest.body();
if (body != null) {
    MediaType contentType = body.contentType();
    if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
    }

 //傳輸長度和編碼設定
    long contentLength = body.contentLength();
    if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
    } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
    }
}

if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
}

boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
//新增Cookie資訊
if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
}

Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
//處理Response
Response.Builder responseBuilder = networkResponse.newBuilder()
    .request(userRequest);

if (transparentGzip
    && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
    && HttpHeaders.hasBody(networkResponse)) {
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    Headers strippedHeaders = networkResponse.headers().newBuilder()
        .removeAll("Content-Encoding")
        .removeAll("Content-Length")
        .build();
    responseBuilder.headers(strippedHeaders);
    String contentType = networkResponse.header("Content-Type");
    responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

return responseBuilder.build();

} ```

如程式碼中的註釋,BridageInterceptor 攔截器的具體功能如下:

  1. 設定請求頭資訊,例如Content-Type、Host、Keep-alive等
  2. 新增Cookie和設定內容長度(Content-Length)和編碼(Transfer-Encoding)
  3. 將伺服器返回的Response進行一些轉換,提高可讀性
  4. 處理伺服器壓縮後的response

CacheInterceptor

CacheInterceptor主要是處理HTTP請求快取的,通過快取攔截器可以有效的使用快取減少網路請求:

``` @Override public Response intercept(Chain chain) throws IOException { Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;

long now = System.currentTimeMillis();

//建立一個快取策略,用來規定怎麼使用快取
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
//為空表示不使用網路,反之,則表示使用網路
Request networkRequest = strategy.networkRequest;
//為空表示不使用快取,反之,則表示使用快取
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
    cache.trackResponse(strategy);
}

if (cacheCandidate != null && cacheResponse == null) {
    closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// 如果網路被禁止,切快取為空,直接返回一個空的Response
if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
        .request(chain.request())
        .protocol(Protocol.HTTP_1_1)
        .code(504)
        .message("Unsatisfiable Request (only-if-cached)")
        .body(Util.EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
}

// 如果網路被禁止,快取不為空,直接返回快取
if (networkRequest == null) {
    return cacheResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build();
}

//網路沒被禁止,使用正常流程,通過訪問網路獲得資料
Response networkResponse = null;
try {
    networkResponse = chain.proceed(networkRequest);
} finally {
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
    }
}

// 如果本地已經有快取,切返回的狀態碼為304,則對快取進行一些更新,例如head資訊,請求和接受的時間等
if (cacheResponse != null) {
    if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
    } else {
        closeQuietly(cacheResponse.body());
    }
}

Response response = networkResponse.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build();

if (cache != null) {
    if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // 儲存快取
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
    }

    if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
            cache.remove(networkRequest);
        } catch (IOException ignored) {

        }
    }
}

return response;

} ```

CacheInterceptor會通過Request嘗試到Cache中拿快取,預設沒有快取,需要通過OkHttpClient.setInternalCache方法設定。這些快取會通過CacheInterceptor的構造方法做飯引數被傳遞到攔截器中。大致流程如下:

  • 如果快取為空,而且禁用了網路(可通過Request.cacheControl設定)則直接返回一個返回碼為504的Response。
  • 有快取而且禁用網路,則返回快取。
  • 如果以上都沒命中,則走網路請求流程(chain.proceed,執行後續的攔截器)。
  • 如果網路請求返回304而且本地有快取,則直接使用本地快取
  • 之後就是構建Response,如果OkHttpClient中配置了快取,則將Resposne快取起來,並返回給呼叫者

ConnectInterceptor

ConnectInterceptor和CallServerInterceptor可以說是最重要的兩個攔截器了,ConnectInterceptor負責Dns解析和Socket連線。它的程式碼非常簡單:

```

@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);

} ``` 僅僅是獲取到呼叫鏈中的streamAllocation,然後通過streamAllocation獲取到RealConnection即進入到下一個攔截器中了。攔截器基本上將所有的操作都交給了streamAllocation,我們稍後再對它進行詳細的分析。這裡只展示一下這些攔截器的主要職責。

CallServerInterceptor

CallServerInterceptor是最後一個攔截器了,前面的攔截器將請求都封裝好了,客戶端和服務端的連線也打通了。CallServerInterceptor就是進行資料傳輸的地方了:

``` @Override public Response intercept(Chain chain) throws IOException { //獲取在呼叫鏈中傳遞的HttpCodec、StreamAllocation和RealConnection RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec httpCodec = realChain.httpStream(); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();

realChain.eventListener().requestHeadersStart(realChain.call());
//向伺服器傳送請求頭
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);

Response.Builder responseBuilder = null;
//如果請求有body(POST請求)
//封裝body並將其傳送給伺服器
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    //當請求頭為"Expect: 100-continue"時,在傳送請求體之前需要等待伺服器返回"HTTP/1.1 100 Continue" 的response,如果沒有等到該response,就不傳送請求體。
    //POST請求,先發送請求頭,在獲取到100繼續狀態後繼續傳送請求體
    if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        //這裡本質上是Socket IO操作,
        //強制將資料從緩衝區寫入目標位置(伺服器)
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        //解析響應頭
        responseBuilder = httpCodec.readResponseHeaders(true);
    }

    //將請求體寫入伺服器
    if (responseBuilder == null) {
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
        new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        //寫入nody
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
    } else if (!connection.isMultiplexed()) {
        streamAllocation.noNewStreams();
    }
}

//強制重新整理
httpCodec.finishRequest();

if (responseBuilder == null) {
    realChain.eventListener().responseHeadersStart(realChain.call());
    responseBuilder = httpCodec.readResponseHeaders(false);
}

//構建response
Response response = responseBuilder
        .request(request)
    .handshake(streamAllocation.connection().handshake())
    .sentRequestAtMillis(sentRequestMillis)
    .receivedResponseAtMillis(System.currentTimeMillis())
    .build();

int code = response.code();
if (code == 100) {
    // server sent a 100-continue even though we did not request one.
    // try again to read the actual response
    responseBuilder = httpCodec.readResponseHeaders(false);

    response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    code = response.code();
}

//..

return response;

} ```

程式碼主要邏輯如下:

  • 向伺服器傳送請求頭
  • 如果有請求體,接著傳送請求體
  • 讀取返回頭並構建Response物件
  • 如果有返回體,則再次構建新的Response物件

所有的邏輯都是藉助HttpCodec完成的,我們馬上對它進行分析。

攔截器的基石——連結的建立、管理和資料的傳輸

上文中講到的ConnectInterceptor和CallServerInterceptor,我們只是簡述了一下它們的職責。並沒有深入的探討其實現。接下來就是探尋這其中更深的奧祕了。 首先是看StreamAllocation,它在RetryAndFollowUpInterceptor中建立,並一直到ConnectInterceptor攔截器裡才被使用。在ConnectInterceptor小節提到過,它負責Dns解析和Socket連線,而這些任務都是交給StreamAllocation執行的。其中關鍵程式碼就如下兩句:

``` //... HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection); `` ConnectInterceptor建立了HttpCodec和RealConnection的例項,並將它們作為引數新增到責任鏈的呼叫中,傳向了下一個攔截器。其中streamAllocation.connection()方法就是直接返回了StreamAllocation的一個成員變數。而且,StreamAllocation是在RetryAndFollowUpInterceptor`中初始化的,它的建構函式除了為一些成員變數賦值之外沒有做任何操作:

``` //RetryAndFollowUpInterceptor.java StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()), call, eventListener, callStackTrace);

//StreamAllocation.java
public StreamAllocation(ConnectionPool connectionPool, Address address, Call call,EventListener eventListener, Object callStackTrace) { //連線池 this.connectionPool = connectionPool; //連線到伺服器的內容詳情 this.address = address; //請求Call this.call = call; this.eventListener = eventListener; this.routeSelector = new RouteSelector(address, routeDatabase(), call, eventListener); this.callStackTrace = callStackTrace; } ```

StreamAllocation的關鍵成員變數如下:

  • connectionPool:管理HTTP的連線,並負責它們的重用(由OkHttpClient預設建立並提供)
  • address:連線到伺服器的內容詳情,一般包括hostname、port、proxy等

接下來就是重點分析StreamAllocation.newStream方法的內容和流程了

HTTP連線的管理和建立

``` public HttpCodec newStream(OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { int connectTimeout = chain.connectTimeoutMillis(); //... boolean connectionRetryEnabled = client.retryOnConnectionFailure();

try {
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
    writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
    HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

    synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
    }
} catch (IOException e) {
    throw new RouteException(e);
}

}

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,boolean doExtensiveHealthChecks) throws IOException { //迴圈遍歷可用的連結 while (true) { RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); synchronized (connectionPool) { //如果是新建立的連結,則直接跳過檢查連線是否可用的邏輯 if (candidate.successCount == 0) { return candidate; } } //檢查連線是否可用 if (!candidate.isHealthy(doExtensiveHealthChecks)) { noNewStreams(); continue; }

    //...
}

}

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; Connection releasedConnection; Socket toClose; synchronized (connectionPool) { //..

    //是否存在之前已分配的連線,如果有則嘗試使用
    //在進行RetryAndFollowUpInterceptor進行重試和重定向的時候可能會觸發此處的邏輯
    //StreamAllocation例項是在RetryAndFollowUpInterceptor迴圈嘗試的之前進行初始化的
    //後續ConnectInterceptor攔截器在執行此處時,可能因為異常或者重定向多次導致多次呼叫該方法
    releasedConnection = this.connection;
    toClose = releaseIfNoNewStreams();
    if (this.connection != null) {
        // 確定已經存在連線並且是可用的
        result = this.connection;
        releasedConnection = null;
    }
    //..

    //正常順利的情況下走此處的邏輯,根據adress去連線池裡獲取連線
    if (result == null) {
        // Attempt to get a connection from the pool.
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
            foundPooledConnection = true;
            result = connection;
        } else {
            selectedRoute = route;
        }
    }
}
closeQuietly(toClose);

//...
if (result != null) {
    //從連線池裡拿到了可用的連線,則直接使用
    return result;
}

//如果需要進行路由選擇,則進行一次路由選擇
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
    newRouteSelection = true;
    routeSelection = routeSelector.next();
}

synchronized (connectionPool) {
    if (canceled) throw new IOException("Canceled");

    if (newRouteSelection) {
        // 路由選擇只會拿到了一組IP,再次嘗試獲從連線池中獲取
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
            Route route = routes.get(i);
            Internal.instance.get(connectionPool, address, this, route);
            if (connection != null) {
                foundPooledConnection = true;
                result = connection;
                this.route = route;
                break;
            }
        }
    }

    //如果依舊沒有獲得連線,則開始建立連線 
    if (!foundPooledConnection) {
        if (selectedRoute == null) {
            selectedRoute = routeSelection.next();
        }

        route = selectedRoute;
        refusedStreamCount = 0;
        //建立新的連線
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
    }
}

//..

// 執行TCP+TLS,(此處是一個阻塞過程)
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
    connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());

synchronized (connectionPool) {
    reportedAcquired = true;

    //將建立成功的連線加入到連線池中
    Internal.instance.put(connectionPool, result);

    //..
}

eventListener.connectionAcquired(call, result);
return result;

} ```

獲取連結的流程可以簡化如下:

  • 首先嚐試獲取當前StreamAllocation中已經存在的連線,由於重試和重定向的功能。StreamAllocation的方法可能會執行多次,後續重試請求可以先嚐試使用之前Call建立的連線
  • 如果當前沒有連線,則嘗試從連線池裡獲取。連線池由OkHttpClient提供
  • 如果仍沒有連線,則進行路由選擇,並再次嘗試獲取
  • 獲取不到,則構建RealConnection例項,並執行TCP + TLS握手
  • 最後,將建立好的連結放入到連線池中

需要注意一點,上面中的findHealthyConnection方法並不是拿到連線就結束工作。它迴圈操作,會檢查拿到的連線是否可用。具體程式碼如下:

``` public boolean isHealthy(boolean doExtensiveChecks) { //檢查Socket是否可用 //Socket是否關閉、輸入輸出流是否關閉 if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) { return false; }

//HTTP/2連線是否關閉
if (http2Connection != null) {
    return !http2Connection.isShutdown();
}

//..

return true;

} ```

會檢查Socket、輸入輸出流和Http2連線是否關閉。

先附上StreamAllocation完整程式碼連結

現在知道了大致流程,但還是有幾個問題:

  • 連結是如何建立並連線的?
  • 連線池如何實現的,是如何管理連線的?

首先看連線的建立:

連線的建立

在上文中,已經展示過連線建立的程式碼了:

``` result = new RealConnection(connectionPool, selectedRoute);

result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); ```

建立RealConnection例項,然後執行connect方法。其中初始化中的connectionPool引數的主要作用是用來作為鎖使用的。connect精簡後的程式碼如下:

``` public void connect(int connectTimeout, int readTimeout, int writeTimeout,int pingIntervalMillis, boolean connectionRetryEnabled, Call call,EventListener eventListener) { //... while (true) { try { if (route.requiresTunnel()) { //HTTP隧道 if (rawSocket == null) { break; } } else { //建立Socket連結 connectSocket(connectTimeout, readTimeout, call, eventListener); } //Https請求的tls建立過程
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener); //.. break; } catch (IOException e) { //.. } }

//...

}

private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,EventListener eventListener) throws IOException { Request tunnelRequest = createTunnelRequest(); HttpUrl url = tunnelRequest.url(); for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) { //建立Socket連結 connectSocket(connectTimeout, readTimeout, call, eventListener); tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url); if (tunnelRequest == null) break; closeQuietly(rawSocket); rawSocket = null; sink = null; source = null; eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null); } } ```

連結的建立首先會經過是否使用隧道技術,如果使用了則先呼叫connectTunnel方法做一些協議交換工作,再呼叫connectSocket方法進行socket連結。否則直接使用connectSocket方法進行Socket連結。由於隧道連結不是這篇文章的重點(實際上是因為我也不會),我們直接分析connectSocket

```

private void connectSocket(int connectTimeout, int readTimeout, Call call,EventListener eventListener) throws IOException { Proxy proxy = route.proxy(); Address address = route.address();

//建立Socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);

//..
try {
    //連結Socket
    Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
    ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
    ce.initCause(e);
    throw ce;
}

try {
    //Okio建立輸入輸出流
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
    if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
    }
}

} ```

connectSocket方法的核心邏輯只有三個:

  • 建立Socket例項
  • 連結Socket
  • 使用Okio為Socket建立輸入輸出流

其中Socket的建立分兩種,一種是直接通過Socket(Proxy proxy)構造方法建立,一種是藉助Address中的socketFactory建立,該socketFactory在OkHttpClient中被初始化,預設是使用DefaultSocketFactory,其關鍵程式碼如下:

``` class DefaultSocketFactory extends SocketFactory {

public Socket createSocket() {
    return new Socket();
}

public Socket createSocket(String host, int port)
throws IOException, UnknownHostException
{
    return new Socket(host, port);
}

public Socket createSocket(InetAddress address, int port)
throws IOException
{
    return new Socket(address, port);
}

public Socket createSocket(String host, int port,
    InetAddress clientAddress, int clientPort)
throws IOException, UnknownHostException
{
    return new Socket(host, port, clientAddress, clientPort);
}

public Socket createSocket(InetAddress address, int port,
    InetAddress clientAddress, int clientPort)
throws IOException
{
    return new Socket(address, port, clientAddress, clientPort);
}

}

```

它的主要功能就是提供各種方法,實現對Socket不同建構函式的呼叫。緊接著就是Socket的連線:

public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException { socket.connect(address, connectTimeout); }

也很簡單,就是呼叫Socket.connect方法

接下來就是Okio,本質就是IO流。詳情可見Okio原始碼和流程分析

可見,OkHttp連結最終還是Socket,並藉助Okio實現IO操作,從而實現資料傳輸。還是最基本的套接字和IO操作。

連線池的實現和連線的管理

ConnectionPool內部通過一個雙端佇列connections管理現有連線,並限制了統一地址的最大空閒連線數和空閒連線的最大存活時間:

``` public final class ConnectionPool { //... //最大空閒連線數 private final int maxIdleConnections; //空閒的連線數存活時間 private final long keepAliveDurationNs;

//所有連線 private final Deque connections = new ArrayDeque<>();

boolean cleanupRunning;

public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); }

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.maxIdleConnections = maxIdleConnections; this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

if (keepAliveDuration <= 0) {
  throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}

} //... } ```

通過原始碼可以看到,ConnectionPool默認同一地址的最大連線數為5個,預設存活時間為5分鐘。

從連線池中獲取連線的程式碼如下: @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (connection.isEligible(address, route)) { streamAllocation.acquire(connection, true); return connection; } } return null; }

通過遍歷所有連線,然後比對地址和路由確定連線是否可用。

向連線池中加入連線的程式碼如下:

``` void put(RealConnection connection) { assert (Thread.holdsLock(this)); if (!cleanupRunning) { cleanupRunning = true; executor.execute(cleanupRunnable); } connections.add(connection); }

```

在向連結串列中新增連線之前,還執行了一些執行緒池的操作。相關程式碼如下:

``` public final class ConnectionPool {

private static final Executor executor = new ThreadPoolExecutor(0 / corePoolSize /, Integer.MAX_VALUE / maximumPoolSize /, 60L / keepAliveTime /, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp ConnectionPool", true));

private final int maxIdleConnections; private final long keepAliveDurationNs; private final Runnable cleanupRunnable = new Runnable() { @Override public void run() { while (true) { long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } }; } ```

每次新增新的連線到連線池中時,都會檢查executor執行緒池是否在執行。如果沒有執行,在開啟執行緒池。而執行緒池執行的任務也很簡單。不斷的呼叫cleanup方法,並根據該方法返回的wait的秒數,呼叫wait方法進入阻塞。從而實現不斷且有規律的對cleanup方法的呼叫。接著看一下cleanup方法:

``` long cleanup(long now) { int inUseConnectionCount = 0; int idleConnectionCount = 0; RealConnection longestIdleConnection = null; long longestIdleDurationNs = Long.MIN_VALUE;

synchronized (this) {
    //遍歷連線
    for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
    RealConnection connection = i.next();

    //統計連線的引用數量        
    if (pruneAndGetAllocationCount(connection, now) > 0) {
        //標記正在被使用的連線數量
        inUseConnectionCount++;
        continue;
    }
    //標記空閒連線的數量
    idleConnectionCount++;

    //找出空閒連線中,空閒時間最長的連線
    long idleDurationNs = now - connection.idleAtNanos;
    if (idleDurationNs > longestIdleDurationNs) {
        longestIdleDurationNs = idleDurationNs;
        longestIdleConnection = connection;
    }
}

    if (longestIdleDurationNs >= this.keepAliveDurationNs
        || idleConnectionCount > this.maxIdleConnections) {
        //如果空閒連線數大於允許的最大連線數,
        //或者空閒時間大於允許的最大空閒時間。則將最老的空閒連線清除掉
        connections.remove(longestIdleConnection);
    } else if (idleConnectionCount > 0) {
        // 如果最大空閒數量和最久空閒時間都不滿足條件,根據最老的連線計算下一次清理時間並返回
        return keepAliveDurationNs - longestIdleDurationNs;
    } else if (inUseConnectionCount > 0) {
        //當前連線都在使用,直接返回最長存活時間
        return keepAliveDurationNs;
    } else {
        cleanupRunning = false;
        return -1;
    }
}

//關閉空閒最久的連線
closeQuietly(longestIdleConnection.socket());
return 0;

} ```

程式碼邏輯如下:

  • 通過pruneAndGetAllocationCount方法統計正在使用的連線和空閒的連線,並找出空閒最久的連線;
  • 如果空閒最久的連線空閒時間大於規定時間keepAliveDurationNs(預設五分鐘)或者空閒連線數量大於規定數量maxIdleConnections(預設五個)則將最老的空閒執行緒清除掉;
  • 如果不滿足清理條件,則計算下一次清理時間並返回(如果連線都在使用,則返回最大存活時間)
  • 如果沒用連線,則將cleanupRunning設定為false,並返回-1。標記自動清理沒有執行。

結合cleanupRunnable和cleanup方法可知,每次新增新的連線到連線池中之後。連線池就會通過cleanupRunnable開啟迴圈不斷的呼叫cleanup來嘗試清理連線。當不滿足清理條件時,cleanupRunnable會通過cleanup所返回的時間進入等待狀態,直到所有的連線,cleanupRunnable會終止並修改cleanupRunning標記等待新的連線進來,再次開啟迴圈。

資料的傳輸

網路請求的最關鍵的步驟到了,就是資料的傳輸。上文講到了連線的建立,我們知道了OkHttp最終還是通過Socket進行資料傳輸的,而且是藉助Okio執行IO操作的。接下來我們看一下具體流程。首先讓我們看一下HttpCodec,該物件例項是緊跟著RealConnection之後初始化的:HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,StreamAllocation streamAllocation) throws SocketException { if (http2Connection != null) { return new Http2Codec (client, chain, streamAllocation, http2Connection); } else { socket.setSoTimeout(chain.readTimeoutMillis()); source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS); sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS); return new Http1Codec (client, streamAllocation, source, sink); } }

根據http2Connection是否為null返回Http2Codec或者Http1Codec的例項,它們都實現HttpCodec介面。Http1Codec 的初始化非常簡單,接收了OkHttpClien例項,streamAllocation 例項和source, sink。後面兩個是Okio中的物件,分別對應輸入流和輸出流。http2Connection表明是http/2的連線,它在establishProtocol中會連線地址協議中是否包含HTTP/2進行建立。其初始化方法如下:

private void startHttp2(int pingIntervalMillis) throws IOException { socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream. http2Connection = new Http2Connection.Builder(true) .socket(socket, route.address().url().host(), source, sink) .listener(this) .pingIntervalMillis(pingIntervalMillis) .build(); http2Connection.start(); } 不難發現,Http2Codec的初始化藉助了http2Connection,而http2Connection中包含著source, sink。可以大膽猜測資料的傳輸就是藉助這兩個分別進行讀取資料和寫入資料的操作的。

為了驗證這一點,我們在看一下CallServerInterceptor攔截器的想伺服器傳送資料的流程:

  • 向伺服器傳送請求頭
  • 如果有請求體,接著傳送請求體
  • 讀取返回頭並構建Response物件
  • 如果有返回體,則再次構建新的Response物件
向伺服器傳送資料

首先抽取傳送請求的程式碼:

``` //將請求頭寫入到輸出流快取 httpCodec.writeRequestHeaders(request); //強制重新整理,將資料寫入到伺服器中 httpCodec.flushRequest();

//建立CountingSink,本質上也是個Sink(輸出流) CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, contentLength)); //建立輸出流快取 BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); //將請求的Body寫入到輸出流快取 request.body().writeTo(bufferedRequestBody); //關閉輸出流,結束請求的發動流程 bufferedRequestBody.close(); httpCodec.finishRequest(); ```

大致流程如上。 首先分析Http1Codec中的實現,其中HttpCodec.writeRequestHeaders()程式碼如下。

``` @Override public void writeRequestHeaders(Request request) throws IOException { String requestLine = RequestLine.get( request, streamAllocation.connection().route().proxy().type()); writeRequest(request.headers(), requestLine); }

public void writeRequest(Headers headers, String requestLine) throws IOException { if (state != STATE_IDLE) throw new IllegalStateException("state: " + state); sink.writeUtf8(requestLine).writeUtf8("\r\n"); for (int i = 0, size = headers.size(); i < size; i++) { sink.writeUtf8(headers.name(i)) .writeUtf8(": ") .writeUtf8(headers.value(i)) .writeUtf8("\r\n"); } sink.writeUtf8("\r\n"); state = STATE_OPEN_REQUEST_BODY; } ```

程式碼邏輯很簡單,提取出Request中的Headers後通過Okio.sink執行對伺服器的IO寫操作。

緊接著是createRequestBody方法:

```

@Override public Sink createRequestBody(Request request, long contentLength) { if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) { return newChunkedSink(); } if (contentLength != -1) { return newFixedLengthSink(contentLength); } }

public Sink newChunkedSink() { if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state); state = STATE_WRITING_REQUEST_BODY; return new ChunkedSink(); }

public Sink newFixedLengthSink(long contentLength) { if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state); state = STATE_WRITING_REQUEST_BODY; return new FixedLengthSink(contentLength); } `` 根據是否知道請求體的長度分別返回ChunkedSink FixedLengthSink 的例項,它們是Http1Codec的內部類,都實現了Sink`介面可用來執行IO的輸出流操作。

而緊接著都是通過Okio執行的輸出流操作,最後執行httpCodec.finishRequest()方法,在finishRequest中的實現如下:

@Override public void finishRequest() throws IOException { sink.flush(); } 就是最基本的強制輸出流重新整理操作。

接下來看Http2Codec中的實現,首先是writeRequestHeaders方法:

``` @Override public void writeRequestHeaders(Request request) throws IOException { if (stream != null) return;

boolean hasRequestBody = request.body() != null;
List<Header> requestHeaders = http2HeadersList(request);
stream = connection.newStream(requestHeaders, hasRequestBody);
stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);
stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);

} ```

執行邏輯交給了Http2Stream的例項stream進行處理了:

``` public Http2Stream newStream(List

requestHeaders, boolean out) throws IOException { return newStream(0, requestHeaders, out); }

private Http2Stream newStream(int associatedStreamId, List

requestHeaders, boolean out) throws IOException { boolean outFinished = !out; boolean inFinished = false; boolean flushHeaders; Http2Stream stream; int streamId;

synchronized (writer) {
    synchronized (this) {
        // 計算當前Stream的id
        if (nextStreamId > Integer.MAX_VALUE / 2) {
            shutdown(REFUSED_STREAM);
        }
        if (shutdown) {
            throw new ConnectionShutdownException();
        }
        streamId = nextStreamId;
        nextStreamId += 2;
        stream = new Http2Stream(streamId, this, outFinished, inFinished, null);
        flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;
        if (stream.isOpen()) {
            streams.put(streamId, stream);
        }
    }
    if (associatedStreamId == 0) {
        writer.synStream(outFinished, streamId, associatedStreamId, requestHeaders);
    } else if (client) {
        throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
    } else { // HTTP/2 has a PUSH_PROMISE frame.
        writer.pushPromise(associatedStreamId, streamId, requestHeaders);
    }
}

if (flushHeaders) {
    writer.flush();
}

return stream;

}

`` 這裡的邏輯主要是用來處理PUSH_PROMISE幀操作的,最終會執行Http2WritersynStream或者pushPromise`方法,最終還是通過Okio.sink完成輸出流操作,向伺服器寫入資料。

而它的finishRequest方法本質上是呼叫了SInk的close方法:

@Override public void finishRequest() throws IOException { stream.getSink().close(); }

接受伺服器返回的資料

抽取CallServerInterceptor攔截器中的接收伺服器中資料的程式碼如下:

``` //讀取響應頭 Response.Builder responseBuilder = httpCodec.readResponseHeaders(false); //根據響應頭構建Response response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build();

//如果有響應體,則讀取body response = response.newBuilder() .body(httpCodec.openResponseBody(response)) .build(); ```

可見,資料的讀取也是分heades和body兩部分。首先看httpCodec.readResponseHeadersHttp1Codec中的實現:

``` @Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException { //... try { //讀取返回頭 StatusLine statusLine = StatusLine.parse(readHeaderLine()); //構建Response Response.Builder responseBuilder = new Response.Builder() .protocol(statusLine.protocol) .code(statusLine.code) .message(statusLine.message) .headers(readHeaders()); //.. return responseBuilder; } catch (EOFException e) { //... } }

private String readHeaderLine() throws IOException { //執行IO讀操作 String line = source.readUtf8LineStrict(headerLimit); headerLimit -= line.length(); return line; } `` 可見,最終是藉助readHeaderLine`通過Okio.source實現的。

接著看openResponseBody的實現:

``` @Override public ResponseBody openResponseBody(Response response) throws IOException { streamAllocation.eventListener.responseBodyStart(streamAllocation.call); String contentType = response.header("Content-Type");

if (!HttpHeaders.hasBody(response)) { 
  Source source = newFixedLengthSource(0);
  return new RealResponseBody(contentType, 0, Okio.buffer(source));
}

if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
  Source source = newChunkedSource(response.request().url());
  return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}

long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
  Source source = newFixedLengthSource(contentLength);
  return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}

return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));

} ```

和請求的body寫入到伺服器重有異曲同工之妙,都是根據body的長度採取不同的方案,newFixedLengthSourcenewChunkedSource程式碼如下:

public Source newFixedLengthSource(long length) throws IOException { if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); state = STATE_READING_RESPONSE_BODY; return new FixedLengthSource(length); } public Source newChunkedSource(HttpUrl url) throws IOException { if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state); state = STATE_READING_RESPONSE_BODY; return new ChunkedSource(url); }

FixedLengthSourceChunkedSource都繼承自AbstractSource,本質上都是Source輸入流。

接下來看Http2Codec中的實現,首先是readResponseHeaders

``` @Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException { Headers headers = stream.takeHeaders(); Response.Builder responseBuilder = readHttp2HeadersList(headers, protocol); if (expectContinue && Internal.instance.code(responseBuilder) == HTTP_CONTINUE) { return null; } return responseBuilder; }

`` stream.takeHeaders負責獲取到了響應中的Headers資訊,readHttp2HeadersList則負責構建Response。我們重點關注stream.takeHeaders`:

public synchronized Headers takeHeaders() throws IOException { readTimeout.enter(); try { while (headersQueue.isEmpty() && errorCode == null) { waitForIo(); } } finally { readTimeout.exitAndThrowIfTimedOut(); } if (!headersQueue.isEmpty()) { return headersQueue.removeFirst(); } throw new StreamResetException(errorCode); }

真正進行IO操作的不在這裡,這裡通過一個雙端佇列headersQueue中獲取資料,waitForIo()負責阻塞當前流程,等待從佇列中獲取資料。而真正的讀取資料的操作在哪裡呢?

Http2Codec連線啟動時,會建立新的執行緒不斷地進行資料讀取,讀到資料後再向下分發。在Http2Codec啟動時,會有一下呼叫邏輯:RealConnection.establishProtocol->RealConnection.startHttp2->Http2Connection.start。最終執行的程式碼邏輯如下:

```

public void start() throws IOException { start(true); }

void start(boolean sendConnectionPreface) throws IOException { if (sendConnectionPreface) { writer.connectionPreface(); writer.settings(okHttpSettings); int windowSize = okHttpSettings.getInitialWindowSize(); if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) { writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE); } } new Thread(readerRunnable).start(); // Not a daemon thread. }

Http2Connection(Builder builder) { readerRunnable = new ReaderRunnable(new Http2Reader(builder.source, client)); } ```

此處的關鍵邏輯建立一個新的執行緒開始執行readerRunnable,它是一個ReaderRunnable的例項,具體定義如下:

``` class ReaderRunnable extends NamedRunnable implements Http2Reader.Handler { final Http2Reader reader;

@Override protected void execute() {
    ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
    ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
    try {
        reader.readConnectionPreface(this);
         //註釋①
        while (reader.nextFrame(false, this)) {
        }
        connectionErrorCode = ErrorCode.NO_ERROR;
        streamErrorCode = ErrorCode.CANCEL;
    } catch (IOException e) {
        connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
        streamErrorCode = ErrorCode.PROTOCOL_ERROR;
    } finally {
        try {
            close(connectionErrorCode, streamErrorCode);
        } catch (IOException ignored) {
        }
        Util.closeQuietly(reader);
    }
}

} ```

這端程式碼的關鍵是註釋①處,不斷的呼叫Http2Reader.nextFrame方法: ``` public boolean nextFrame(boolean requireSettings, Handler handler) throws IOException { try { source.require(9); // Frame header size } catch (IOException e) { return false; // This might be a normal socket close. }

int length = readMedium(source);
if (length < 0 || length > INITIAL_MAX_FRAME_SIZE) {
    throw ioException("FRAME_SIZE_ERROR: %s", length);
}
byte type = (byte) (source.readByte() & 0xff);
if (requireSettings && type != TYPE_SETTINGS) {
    throw ioException("Expected a SETTINGS frame but was %s", type);
}
byte flags = (byte) (source.readByte() & 0xff);
int streamId = (source.readInt() & 0x7fffffff); // Ignore reserved bit.
if (logger.isLoggable(FINE)) logger.fine(frameLog(true, streamId, length, type, flags));

switch (type) {
    case TYPE_DATA:
    readData(handler, length, flags, streamId);
    break;

    case TYPE_HEADERS:
    readHeaders(handler, length, flags, streamId);
    break;

    case TYPE_PRIORITY:
    readPriority(handler, length, flags, streamId);
    break;

    case TYPE_RST_STREAM:
    readRstStream(handler, length, flags, streamId);
    break;

    case TYPE_SETTINGS:
    readSettings(handler, length, flags, streamId);
    break;

    case TYPE_PUSH_PROMISE:
    readPushPromise(handler, length, flags, streamId);
    break;

    case TYPE_PING:
    readPing(handler, length, flags, streamId);
    break;

    case TYPE_GOAWAY:
    readGoAway(handler, length, flags, streamId);
    break;

    case TYPE_WINDOW_UPDATE:
    readWindowUpdate(handler, length, flags, streamId);
    break;

    default:
    // Implementations MUST discard frames that have unknown or unsupported types.
    source.skip(length);
}
return true;

} ```

這裡不精執行了source的讀操作,並且對不同的資料型別進行了判斷,並呼叫了不同的方法處理讀取到的資料。而接下來就是我們的Headers讀取的流程了:Http2Reader.readHeaders:

``` private void readHeaders(Handler handler, int length, byte flags, int streamId) throws IOException { //.. List

headerBlock = readHeaderBlock(length, padding, flags, streamId);

handler.headers(endStream, streamId, -1, headerBlock);

} `` 來到了熟悉的Handler環節,看來這裡就是執行緒間通訊,將Http2Connection開啟的執行緒切換到我們請求執行的執行緒中?但是這個Handler卻是Http2Reader.Handler:還記得上文中的ReaderRunnable嗎,它作為Runable被傳入到了Http2Connection 執行緒中,它不僅實現了Runable介面,還實現了Handler介面Http2Reader.HandlerHttp2Reader.Handler headersReaderRunnable 中的實現最終呼叫了Http2Stream.receiveHeaders`方法:

void receiveHeaders(List<Header> headers) { assert (!Thread.holdsLock(Http2Stream.this)); boolean open; synchronized (this) { hasResponseHeaders = true; //向佇列內新增資料 headersQueue.add(Util.toHeaders(headers)); open = isOpen(); notifyAll(); } if (!open) { connection.removeStream(id); } }

就是在這裡,資料被新增到佇列中了。而上文中的takeHeaders只需不斷的從佇列headersQueue中取資料,就可以獲得到響應頭。

接下來是響應體Body的讀取,openResponseBodyHttp2Codec中的實現如下:

``` @Override public ResponseBody openResponseBody(Response response) throws IOException { streamAllocation.eventListener.responseBodyStart(streamAllocation.call); String contentType = response.header("Content-Type"); long contentLength = HttpHeaders.contentLength(response); Source source = new StreamFinishingSource(stream.getSource()); return new RealResponseBody(contentType, contentLength, Okio.buffer(source)); }

class StreamFinishingSource extends ForwardingSource { boolean completed = false; long bytesRead = 0;

StreamFinishingSource(Source delegate) {
  super(delegate);
}

@Override public long read(Buffer sink, long byteCount) throws IOException {
  try {
    long read = delegate().read(sink, byteCount);
    if (read > 0) {
      bytesRead += read;
    }
    return read;
  } catch (IOException e) {
    endOfInput(e);
    throw e;
  }
}

@Override public void close() throws IOException {
  super.close();
  endOfInput(null);
}

private void endOfInput(IOException e) {
  if (completed) return;
  completed = true;
  streamAllocation.streamFinished(false, Http2Codec.this, bytesRead, e);
}

} ```

這裡通過一個代理,實際上最終是FramingSource物件。它是Http2Stream的一個內部類。我們最終獲得到到Response.body本身就是一個Source物件,通過readUtf8方法獲取內容,最終會執行FramingSource的read方法:

``` @Override public long read(Buffer sink, long byteCount) throws IOException { if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);

while (true) {
    //..

    synchronized (Http2Stream.this) {
        readTimeout.enter();
        try {
            if (errorCode != null) {
                // Prepare to deliver an error.
                errorCodeToDeliver = errorCode;
            }

            if (closed) {
                throw new IOException("stream closed");

            } else if (!headersQueue.isEmpty() && headersListener != null) {
                // Prepare to deliver headers.
                headersToDeliver = headersQueue.removeFirst();
                headersListenerToNotify = headersListener;

            } else if (readBuffer.size() > 0) {
                // 讀取資料
                readBytesDelivered = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
                unacknowledgedBytesRead += readBytesDelivered;

                if (errorCodeToDeliver == null
                    && unacknowledgedBytesRead
                    >= connection.okHttpSettings.getInitialWindowSize() / 2) {
                    // Flow control: notify the peer that we're ready for more data! Only send a
                    // WINDOW_UPDATE if the stream isn't in error.
                    connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
                    unacknowledgedBytesRead = 0;
                }
            } else if (!finished && errorCodeToDeliver == null) {
                // Nothing to do. Wait until that changes then try again.
                waitForIo();
                continue;
            }
        } finally {
            readTimeout.exitAndThrowIfTimedOut();
        }
    }


    if (headersToDeliver != null && headersListenerToNotify != null) {
        headersListenerToNotify.onHeaders(headersToDeliver);
        continue;
    }

    if (readBytesDelivered != -1) {
        // Update connection.unacknowledgedBytesRead outside the synchronized block.
        updateConnectionFlowControl(readBytesDelivered);
        return readBytesDelivered;
    }
    //...
    return -1; // This source is exhausted.
}

} ```

總結

Okhttp的核心有以下幾點:

  • 非同步請求通過執行緒池切換執行緒;
  • 整體採用責任鏈模式,通過攔截器分層處理請求和響應結果;
  • 通過連線池的複用實現對連線的管理和避免頻繁/重複建立;
  • 連線通過Socket建立,底層通過Okio實現IO操作

最後說明一下。本文的重點在於OkHttp的主流程,其中涉及到大量的Http相關的知識,比如多路複用、DNS、代理、路由等都沒有詳細的進行分析。一來是因為篇幅有限,更重要的原因是我對這些知識點掌握的也不夠透徹完備,知識儲備不足以提供輸出。同時,如果有發現錯誤之處,希望大家不吝賜教,及時指出問題,大家共同學習進步。