Zookeeper原始碼篇4-新建連線互動流程分析(單機Server服務端與Client客戶端)

語言: CN / TW / HK

歡迎大家關注 github.com/hsfxuebao ,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈

上一篇文章分析過ZK服務端的重要元件以及單機情況下的啟動流程,瞭解了ZK的基本組成。Client客戶端架構元件,單機Server服務端元件架構啟動

對於ZK在執行時有諸多疑問,ZK對於IO多路複用是如何使用的?Client端和Server端是通過什麼方式實現了互動協議的?以及在ZK的執行過程中起到關鍵作用的元件和不同操作的互動流程又是怎樣的?

本篇文章將會從Client端、Server端的互動流程入手,先分析兩端的互動流程以及重要元件,隨後再仔細分析這些重要元件的作用及生效方式來解決上述的幾個問題。需要注意的是互動操作分析的是新建連線,對於新建節點、ping以及關閉連線等的操作大致流程和新建是差不多的,只是內部的操作稍微有點變化而已,這些後續再來分析。

注:本篇基於ZK版本3.7分析的,且需要對ZK的架構以及重要元件組成有一定的基礎瞭解,當然先看完流程再去了解那些架構元件也行,但還是推薦先去了解Server端以及Client端的架構元件,因為本篇不會進行一些基礎性的分析。

1. 互動流程

對於ZK來說,Client和Server端的互動流程值得學習,無論是對於NIO或者Netty的使用,還是為了解決通訊資料傳遞問題。

本次互動流程只考慮正常連線情況,並且將其流程拆解為三步走,分別為:1、客戶端發起連線Server端請求;2、Server端收到並處理響應Client端的連線請求;3、收到Server端的響應生成對應的響應事件觸發本地的監聽器。大致互動流程圖如下:

image.png

接下來看看三步走的具體詳細互動流程圖。

1.1 Client端發起連線

三步走中的第一步具體詳細流程圖如下:

image.png

在這個步驟中擔任主要職責的便是SendThread,圖中簡單的把流程分為了11個步驟,這個圖中如果看過了上兩篇關於ZK服務和客戶端端的重要元件便可以清晰都知道其大致作用,只是圖中加入了NIO的一些類而已。接下來具體分析一下在各個步驟中的一些小細節:

  • C1:對應著Java應用程式傳入連結串並例項化ZooKeeper類進行連線;
  • C2:說明ZooKeeper只是一個API類而已,實際和Server端的互動邏輯並不在這個類中,這個類只會封裝或設定請求引數並交給ClientCnxn實際連線物件處理;
  • C3:這個步驟總的來說是初始化並啟用SendThread和EventThread兩個執行緒物件,SendThread執行緒負責輪詢NIO事件和心跳檢測,EventThread負責處理Server端響應的事件,在圖中分為了四個步驟是為了更直觀的看具體的操作以及大致作用;
  • C4:實際上有非常多的邏輯:更新並判斷處理心跳檢測時間、在剛開始的時候嘗試第一次連線Server端服務、以及判斷驗證資訊等。並且SendThread是一個執行緒物件,C4這個流程將會被反覆呼叫,這也就是為什麼途中有C4.1和C4.2了,C4.1對應C5、C6、和C7,而C4.2則對應C8那條鏈路的流程;
  • C5:為什麼這裡要說嘗試第一次連線?因為在這個流程中會呼叫一次SocketChannel的連線方法connect,而SocketChannel在建立的時候就已經被設定為阻塞狀態了,因此connect方法即使沒有立即連上去,也會後續通過一個OP_CONNECT事件來通知,但也有機率第一次就連上去,雖然機率比較小,下面的C6和C7便是針對C5第一次沒連線上而採取的措施;
  • C6:當第一次嘗試連線沒連上,後續NIO的Server端連上了Client端的Selector將會收到NIO事件OP_CONNECT,並獲取到相應的SelectionKey和SocketChannel物件;
  • C7:獲取到NIO的Socket後將會完成連線,並且生成對應的ConnectRequest和Packet物件放到outgoingQueue陣列中以便下次開啟OP_WRITE寫事件後能夠傳送資料包;
  • C8:這個流程應該要從C3.3開始走起,當C7完成連線並且把資料包儲存在outgoingQueue陣列中時,將會開啟OP_WRITE事件,此時SendThread迴圈時將可以通過Selector獲取到這個寫事件,從而進入C8流程判斷為寫操作;
  • C9:當確認進入寫操作時將會把Packet取出來以便後續流程使用;
  • C10:呼叫C9流程拿出來的Packet物件的createBB()方法,將其序列化並存放到Packet物件中的ByteBuffer快取物件中;
  • C11:使用獲取到的SocketChannel物件將ByteBuffer快取物件中的序列化資料傳送至Server端,並隨後繼續判斷outgoingQueue和Packet物件是否還有資料,如果有資料則保持OP_WRITE事件開啟,否則關閉OP_WRITE,只進行監聽Server端的資料。 如果對NIO的互動流程有一定的瞭解,對於ZK為何要這樣實現的應該能理解一二,如果對NIO互動流程不怎麼熟悉的,也可以參照ZK的使用,自己寫一個通訊多路複用的Demo。

1.2 Server接收處理及響應

三步走中的第二步Server端互動處理流程如下:

image.png

從ZK系列的第二篇文章可以知道NIOServerCnxnFactory在ZK啟動時也是以一個守護執行緒物件執行的,會一直通過Selector輪詢是否有新的IO事件,如果有則根據IO事件型別進行相應的處理,接下來詳細分析下其具體的互動流程:

  • S1:守護執行緒物件將會每隔1s使用Selector輪詢是否有新的IO事件;
  • S2:當Client端呼叫了SocketChannel.connect()方法時,Selector將會收到OP_ACCEPT連線型別的NIO事件,並獲取對應的SocketChannel和SelectionKey物件;
  • S3:當確認是OP_ACCEPT事件時,將會先判斷是否到達最大連線數了,滿足則不會建立新的連線物件,否則註冊SocketChannel生成SelectionKey,並設定成OP_READ讀模式,根據這兩個物件建立NIO連線物件NIOServerCnxn;
  • S4:會將NIOServerCnxn和SelectionKey進行繫結,並將NIOServerCnxn新增到cnxns陣列和ip-NIOServerCnxn對應關係的map物件ipMap,執行完該流程將會監聽等待Client端後續的請求;
  • S5:執行到這個流程時說明Client端已經執行到了C11(即傳送具體的連線請求)流程了,此時Server端將會收到來自Client端Socket的IO事件;
  • S6:前面收到的IO事件對應的SelectionKey操作型別是OP_READ,將會獲取和其繫結的NIOServerCnxn物件;
  • S7:這一步會呼叫NIOServerCnxn物件的doIO()方法,這裡面將會根據是否初始化來判斷是讀取連線請求還是普通的請求,當然在我們這個流程中讀取的是連線請求;
  • S8:將接收到的ByteBuffer反序列化成ConnectRequest請求物件,並根據Server端的心跳間隔時間以及Client傳過來的SessionTimeout過期時間做一箇中和判斷,得出session的過期時間,並利用前面獲得的NIOServerCnxn以及sesion過期時間在SessionTracker中建立session並進行跟蹤,後續在分析ping心跳檢測操作時再詳細分析;
  • S9:根據已有物件資訊建立Request物件,這個物件代表Client的每次具體請求,請求的內容以及相關的session資訊都會在這個類中,並且後續的RequestProcessor系列物件處理最小單元便是Request物件型別,呼叫下一個流程前會更新一波session的過期時間;
  • S10:這個流程的具體執行是在RequestProcessor處理器實現類PrepRequestProcessor中完成的,其也是一系列實現類中的第一個執行類,,主要完成的操作便是根據Request中的header物件操作型別type屬性來建立對應的CreateSessionTxn物件;
  • S11:執行完S10後S11做的操作只有更新一下session失效時間,其它的流程在單機執行中並未起到很大作用;
  • S12:此時已經呼叫到了第二個RequestProcessor處理器SyncRequestProcessor中,這個處理器做的事情便是儲存請求日誌和執行快照,具體的處理細節後續看有機會再仔細分析一波;
  • S13:當完成對logDir位置進行日誌新增時,將會呼叫到下一個RequestProcessor處理器- FinalRequestProcessor中,在這裡面完成最後的處理及響應;
  • S14:這個流程是同步的,並且在這個流程中也會重新整理一次session過期時間,並重新整理ZK的serverStatus和NIOServerCnxn的近期呼叫狀態時間等;
  • S15:最後把session過期時間、sessionId以及生成的隨機密碼等序列化到ByteBuffer中,隨後通過SocketChannel寫入到IO通道中通知Client端。這是正常流程,還有一種便是響應太長導致一次性發不完便會再次使用NIO的Selector.select()方法處理自身產生的寫事件,直到把響應全部寫完。直到這個流程Server端的連線處理響應流程便全部走完。

在跟蹤這一次請求原始碼時ZK進行了多次重新整理session過期時間,為什麼ZK要在各個Request處理器中都進行一次重新整理session過期時間呢?以併發量小的角度看這個問題可能會很不解,因為針對僅有一次的請求情況,各個處理器之間相當於是同步處理的,所以看起來沒有那麼大的必要;但如果ZK的併發量高了起來,單機部署的情況下除了SyncRequestProcessor呼叫FinalRequestProcessor是同步流程,其它的都是執行緒非同步的,一個響應由ZooKeeperServer類呼叫到FinalRequestProcessor可能中間會相差比較長的時間,如果只在開始呼叫或者結束呼叫的地方進行session過期時間重新整理,中途可能session追蹤器便已經把那些過期時間短的session當做過期的處理了。

當然上述只是我的猜測,ZK獲取有更多的考量,其它方面的原因便需要後續對ZK的深入瞭解才能知道了。

1.3 Client端接收Server端響應

三步走中的最後一步互動流程圖如下:

image.png

這個流程相對於前兩步而言步驟不是很多,大致就兩點:1、Client端監聽收到響應;2、觸發本地的監聽器對發生的ZK事件進行處理。具體流程分析如下:

  • C1:這個步驟可以看成兩個同時進行的步驟:C1.1為SendThread執行緒輪詢Selector監聽Server端是否有新的響應,C2.2為EventThread監聽waitingEvents陣列是否有等待事件處理,需要注意的是waitingEvents陣列中的元素只會通過SendThread執行緒收到響應處理後新增進去,因此waitingEvents陣列的來源可以看成就是SendThread新增的容易理解一點;
  • C2:只針對新建連線而言,這個步驟獲取到的IO事件為OP_READ;
  • C3:判斷IO事件的型別,將會進入doIO()方法讀取SocketChannel的資料;
  • C4:使用前面讀取到的ByteBuffer資料,反序列化成ConnectResponse物件;
  • C5:根據響應物件的屬性設定心跳檢測需要的屬性,如readTimeout、connectTimeout和negotiatedSessionTimeout等,最後會根據KeeperState生成WatchedEvent物件;
  • C6:通過WatchedEvent的KeeperState更新session的狀態;
  • C7:根據ClientWatchManager以及傳入進來的WatchedEvent生成WatcherSetEventPair物件,儲存了需要觸發的監聽器以及對應的響應事件;
  • C8:將WatcherSetEventPair新增到waitingEvents陣列中,waitingEvents陣列中的物件也有可能是Packet型別的物件;
  • C9:通過C1.2開始一直輪詢waitingEvents陣列,當完成C8之後,C9將可以輪詢到事件物件WatcherSetEventPair並進行處理;
  • C10:如果event物件不是eventOfDeath(關閉EventThread執行緒物件標識)物件,則會判斷是否為WatcherSetEventPair型別物件,如果是則遍歷物件中的watcher物件,並傳入事件物件WatchedEvent進行回撥;否則會呼叫Packet中的AsyncCallback物件進行非同步回撥。

截止到這裡回撥基本上就已經結束了,第三步無論是新建連線還是觸發事件進行回撥流程都是一樣的,都是一直輪詢waitingEvents陣列,並判斷型別呼叫相應的監聽器。後續分析操作命令互動以及ping等操作時這一步都是通用的。

2. Client發起連線原始碼分析

2.1 ZooKeeper入口類

前面說過,ZooKeeper是ZK客戶端的API類,連線以及其它的操作都是以這個類為入口的,接下來看下其新建連線的對外介面: ```java public class ZooKeeper { protected final ClientCnxn cnxn; private final ZKWatchManager watchManager = new ZKWatchManager(); public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { // 一般而言新建連線都是使用的這個介面 this(connectString, sessionTimeout, watcher, false); }

public ZooKeeper(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly) throws IOException {
    // createDefaultHostProvider() 解析給出的server的址址,並對解析結果進行第一次shuffle
    this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString));
}
private static HostProvider createDefaultHostProvider(String connectString) {
    // ConnectStringParser() 用於解析指定的server地址列表字串
    // getServerAddresses() 獲取到所有解析出來的地址集合
    return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
}

public ZooKeeper(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly,
    HostProvider hostProvider,
    ZKClientConfig clientConfig
) throws IOException {
    LOG.info(
        "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
        connectString,
        sessionTimeout,
        watcher);

    validateWatcher(watcher);
    this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
    this.hostProvider = hostProvider;
    // 建立一個zk叢集字串解析器,將解析出的ip與port構建為一個地址例項,放入到快取集合
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    // 建立一個對server的連線
    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this.clientConfig,
        watcher,
        getClientCnxnSocket(),
        canBeReadOnly);
    // 開始連線
    cnxn.start();
}

} ``createDefaultHostProvider(connectString)`建立主機提供者,把將快取集合中的地址打散:

```java public StaticHostProvider(Collection serverAddresses) { // init() 的第三個引數是建立了一個地址處理器 // init()中進行了第一次地址的shuffle init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() { // 根據指定的主機名,獲取到所有其對應的ip地址 @Override public InetAddress[] getAllByName(String name) throws UnknownHostException { return InetAddress.getAllByName(name); } });

    private void init(Collection<InetSocketAddress> serverAddresses, long randomnessSeed, Resolver resolver) {
        this.sourceOfRandomness = new Random(randomnessSeed);
        this.resolver = resolver;
        if (serverAddresses.isEmpty()) {
            throw new IllegalArgumentException("A HostProvider may not be empty!");
        }
        // 對地址的第一次打散(shuffle)
        this.serverAddresses = shuffle(serverAddresses);
        currentIndex = -1;
        lastIndex = -1;
    }

}

```

打散的目的在於負載均衡,不然每個客戶端輪詢都會連上第一個

2.2 ClientCnxn連線互動類

這個類裡面有EventThread和SendThread,這兩個內部類是ZK互動時最重要的兩個類,前面也提過,接下來看下ClientCnxn是如何啟動初始化這兩個內部執行緒類的。 java public class ClientCnxn { // 當Client端的資料包Packet被髮送出去時,如果不是ping和auth兩種操作型別,其 // 它操作型別的包都會儲存在佇列末尾,代表著已傳送但未完成的資料,在最後Client // 端收到ZK的響應時,將會把佇列第一個拿出來進行響應的處理。採用的是FIFO模式, // 是因為ZK的Server端接收請求處理請求是有序的,處理完前面一個才會處理後面一個 // 因此客戶端可以採用FIFO的模式處理 private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); // 傳送佇列,當Client端有請求需要傳送時將會封裝成Packet包新增到這裡面,在 // SendThread執行緒輪詢到有資料時將會取出第一個包資料進行處理髮送。使用的也是 // FIFO模式 private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>(); // 連線時間,初始化時等於客戶端sessionTimeout / 可用連線串數量,如果連線成功 // 後將會等於協約時間negotiatedSessionTimeout / 可用連線串數量,因此正常 // 而言,此值就是negotiatedSessionTimeout / 可用連線串數量 private int connectTimeout; // 協約時間,ZK的Server端會設定tickTime,Client端會傳sessionTimeout,ZK的 // Server端將會根據兩邊的配置進行計算得出兩邊都能接受的時間,然後返回。這個 // 欄位儲存的就是協商之後的session過期時間 private volatile int negotiatedSessionTimeout; // 讀取過期時間,連線時值為sessionTimeout * 2 / 3,當連線成功後值為 // negotiatedSessionTimeout * 2 / 3 private int readTimeout; // 開發人員自己定義的客戶端過期時間sessionTimeout(注意這個時間並不是最終 // Client端執行時的心跳檢測時間,後續會出一篇這些時間的具體作用以及計算規則) private final int sessionTimeout; // 入口類的引用物件 private final ZooKeeper zooKeeper; // 客戶端的監聽器管理類,包含了預設監聽器和三種不同型別的監聽器 private final ClientWatchManager watcher; // 本客戶端連線例項的sessionId private long sessionId; // 是否只可讀 private boolean readOnly; // 將來將會被刪除,暫時不知道有何用 final String chrootPath; // Client端對Server端傳送和接收訊息的執行緒物件 final SendThread sendThread; // Client端負責處理響應事件的執行緒物件 final EventThread eventThread; // Client端的連線是否已經關閉 private volatile boolean closing = false; // 連線串的解析後獲得的InetSocketAddress提供物件 private final HostProvider hostProvider; public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException { // 沒有連線密碼的建構函式 this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16], canBeReadOnly); } public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { // 最終呼叫賦值的建構函式 this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; // 計算未連線時的過期時間 connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; // 初始化兩個執行緒物件 sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); } public void start() { // 分別啟動兩個內部執行緒類 sendThread.start(); eventThread.start(); } }

2.3 SendThread傳送連線請求

在ClientCnxn中啟動SendThread執行緒後接下來的主角便只是SendThread以及呼叫的類了,而EventThread類只是在處理事件物件時會分析到。這個類是通過一直迴圈來進行不同的操作,因此不要把這個流程看成只有單一的功能,接收、傳送以及ping等操作都是在迴圈中完成的,但現在我們只分析傳送連線請求的程式碼。 ```java class SendThread extends ZooKeeperThread { // 客戶端連線Server端的負責物件,預設採用的是NIO方式連線 private final ClientCnxnSocket clientCnxnSocket; // 是否為第一次連線,預設是true private boolean isFirstConnect = true;

@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
    // 更新clientCnxnSocket的傳送事件以及關聯SendTreahd,這裡sessionId
    // 沒有值,就是0
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    // 上次ping和現在的時間差
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    // 只要連線沒有關閉,也沒有驗證失敗,就一直迴圈
    while (state.isAlive()) {
        try {
            // 剛開始執行時這裡肯定是未連線的狀態,因此會進去
            if (!clientCnxnSocket.isConnected()) {
                // don't re-establish connection if we are closing
                // 如果ZK已經關閉了則直接會出迴圈
                if (closing) {
                    break;
                }
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    // 獲取要連線的server的址
                    serverAddress = hostProvider.next(1000);
                }
                onConnecting(serverAddress);
                // 開始連線
                startConnect(serverAddress);
                // Update now to start the connection timer right after we make a connection attempt
                clientCnxnSocket.updateNow();
                // 更新互動(連線請求/讀寫請求)時間戳
                clientCnxnSocket.updateLastSendAndHeard();
            }

            ...
                // 獲取已經有多久沒有收到互動響應了
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                // 獲取已經有多久沒有收到連線請求的響應了
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }

            // 處理會話超時的情況
            if (to <= 0) {
                String warnInfo = String.format(
                    "Client session timed out, have not heard from server in %dms for session id 0x%s",
                    clientCnxnSocket.getIdleRecv(),
                    Long.toHexString(sessionId));
                LOG.warn(warnInfo);
                // 丟擲會話超時異常
                throw new SessionTimeoutException(warnInfo);
            }
            ...
            // 這個方法十分重要,因為不管是連線還是其它任何操作都會進入
            // 該方法進行操作型別判斷已經發送接收資料包,具體流程留到
            // 後續分析clientCnxnSocket物件時再看
            clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            ... 異常處理
        }
    }
    // 跑到這裡說明ZK已經關閉了,後面會做一些善後的工作,如傳送關閉事件
    // 清除連線的快取資料等
    synchronized (outgoingQueue) {
        // When it comes to this point, it guarantees that later queued
        // packet to outgoingQueue will be notified of death.
        cleanup();
    }
    clientCnxnSocket.close();
    ...
}

private void startConnect(InetSocketAddress addr) throws IOException {
    ...
    // 修改server狀態
    changeZkState(States.CONNECTING);

    String hostPort = addr.getHostString() + ":" + addr.getPort();
    MDC.put("myid", hostPort);
    // 設定連線名稱
    setName(getName().replaceAll("\(.*\)", "(" + hostPort + ")"));
    // 判斷是否開啟了SASL的客戶端驗證機制(C/S模式的驗證機制)
    if (clientConfig.isSaslClientEnabled()) {
        ...
    }
    // 進行連線的日誌列印
    logStartConnect(addr);
    // 呼叫clientCnxnSocket的連線方法
    clientCnxnSocket.connect(addr);
}

void primeConnection() throws IOException {

    // 呼叫了這個方法說明客戶端和Server端的Socket長連線已經連線完畢了
    // 設定isFirstConnect為false
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    // 建立連線的請求物件ConnectRequest
    ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
    // We add backwards since we are pushing into the front
    // Only send if there's a pending watch
    // disableAutoWatchReset對應著ZK的啟動屬性
    // zookeeper.disableAutoWatchReset,如果為false則為自動將ZK的
    // 監聽器監聽到相應的節點,為true則不會自動監聽
    if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
        // 接下來的流程大概就是從zooKeeper獲取三種類型的監聽器
        // 把三種類型的監聽器依次封裝成SetWatches包儲存到
        // outgoingQueue包中以便後續傳送包資料,具體的流程便忽略
        List<String> dataWatches = watchManager.getDataWatchList();
        List<String> existWatches = watchManager.getExistWatchList();
        List<String> childWatches = watchManager.getChildWatchList();
        List<String> persistentWatches = watchManager.getPersistentWatchList();
        List<String> persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList();
        if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
                || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
            ...
            // 輪詢三種的迭代器獲取迭代器具體資料
            while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
                    || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
                ...
                // 將獲取到的監聽器封裝成SetWatches物件
                Record record;
                int opcode;
                if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
                    // maintain compatibility with older servers - if no persistent/recursive watchers
                    // are used, use the old version of SetWatches
                    record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
                    opcode = OpCode.setWatches;
                } else {
                    record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
                            childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
                    opcode = OpCode.setWatches2;
                }
                RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
                // 隨後使用Packet封裝Header和Recrod
                Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
                // 新增到outgoingQueue資料中
                outgoingQueue.addFirst(packet);
            }
        }
    }

    ...
    // 將ConnectRequest同樣封裝成Packet物件放到outgoingQueue中
    outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
    // 開啟OP_WRITE操作,開啟後Selector.select()將可以收到讀IO
    clientCnxnSocket.connectionPrimed();
    LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}

} ```

從原始碼可以看出來SendThread只是一個執行緒輪詢呼叫類,具體的傳送和接收操作是交給ClientCnxnSocket物件來完成的。

2.4 ClientCnxnSocket套接字互動類

和Socket進行互動的類,負責向Socket中寫入資料和讀取資料。在連線流程中最重要的兩個方法connect和doTransport都是在這個類中,根據在SendThread類中的流程,我們先分析connect,再去看doTransport方法。

```java public class ClientCnxnSocketNIO extends ClientCnxnSocket { // NIO的多路複用選擇器 private final Selector selector = Selector.open(); // 本Socket對應的SelectionKey private SelectionKey sockKey;

@Override
void connect(InetSocketAddress addr) throws IOException {

    // 建立一個NIO的channel
    SocketChannel sock = createSock();
    try {
        // 這個方法的作用便是註冊並嘗試進行連線
        registerAndConnect(sock, addr);
    } catch (IOException e) {
        // 註冊socket失敗
        ...
    }
    // 設定為非初始化
    initialized = false;
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}
SocketChannel createSock() throws IOException {
    // 建立一個SocketChannel物件,並設定非阻塞以及其它屬性
    SocketChannel sock;
    sock = SocketChannel.open();
    sock.configureBlocking(false);
    sock.socket().setSoLinger(false, -1);
    sock.socket().setTcpNoDelay(true);
    return sock;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
        throws IOException {
    // 將Socket註冊到Selector中,並生成唯一對應的SelectionKey物件
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    // 進行Socket連線
    boolean immediateConnect = sock.connect(addr);
    // 如果第一次呼叫就已經連線上,則執行主要的連線操作
    if (immediateConnect) {
        // 這個方法前面已經介紹過了
        sendThread.primeConnection();
    }
}
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, 
        LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
        throws IOException, InterruptedException {
    // 最多休眠waitTimeOut時間獲取NIO事件,呼叫wake()方法、有可讀IO事件和
    // 有OP_WRITE寫事件可觸發
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        // 獲取IO事件保定的SelectionKey物件
        selected = selector.selectedKeys();
    }
    // 更新now屬性為當前時間戳
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        // 先判斷SelectionKey事件是否是連線事件
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            // 如果是連線事件,則呼叫finishConnect()確保已連線成功
            if (sc.finishConnect()) {
                // 連線成功後更新發送時間
                updateLastSendAndHeard();
                // 執行主要的連線方法,準備傳送ZK的連線請求
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & 
                (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            // 再判斷是否是OP_READ或者OP_WRITE事件
            // 如果滿足則呼叫doIO方法來處理對應的事件,doIO便是處理獲取的
            // IO事件核心方法
            doIO(pendingQueue, outgoingQueue, cnxn);
        }
    }
    // 執行到這裡說明本次觸發的NIO事件已經全部執行完畢,但是有可能在途中會
    // 產生新的NIO事件需要執行,因此這裡會判斷是否有可傳送的Packet包,如果有
    // 則開啟OP_WRITE操作,以方便下次直接傳送
    if (sendThread.getZkState().isConnected()) {
        synchronized(outgoingQueue) {
            // 檢視是否有可傳送的Packet包資料
            if (findSendablePacket(outgoingQueue, cnxn.sendThread
                    .clientTunneledAuthenticationInProgress())!=null) {
                // 開啟OP_WRITE操作
                enableWrite();
            }
        }
    }
    // 清除SelectionKey集合
    selected.clear();
}
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
        ClientCnxn cnxn) throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    // 這裡有處理OP_READ型別的判斷,即處理ZK的Server端傳過來的請求
    // 在第一步中不會走到這裡面去,因此忽略
    if (sockKey.isReadable()) {
        ...
    }
    // 處理OP_WRITE型別事件,即處理要傳送到ZK的Server端請求包資料
    if (sockKey.isWritable()) {
        // 保證執行緒安全
        synchronized(outgoingQueue) {
            // 獲取最新的需要傳送的資料包,這裡獲取的便是前面SendThread
            // 放進去的只有ConnectRequest的Packet包物件
            Packet p = findSendablePacket(outgoingQueue, cnxn
                .sendThread.clientTunneledAuthenticationInProgress());
            if (p != null) {
                // 更新最後的傳送時間
                updateLastSend();
                // 如果Packet包的ByteBuffer為空則呼叫createBB()建立
                // 連線時ByteBuffer是一定為空的,因此這裡會一定進入
                if (p.bb == null) {
                    if ((p.requestHeader != null) &&
                        (p.requestHeader.getType() != OpCode.ping) &&
                        (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    // createBB方法的作用便是序列化請求並將byte[]陣列
                    // 新增到ByteBuffer中
                    p.createBB();
                }
                // 使用獲取的SocketChannel寫入含有序列化資料的ByteBuffer
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    // 傳送成功並刪除第一個Packet包物件
                    sentCount++;
                    outgoingQueue.removeFirstOccurrence(p);
                    // 如果requestHeader不為空,不是ping或者auth型別的
                    // 則將Packet包物件新增到pendingQueue中,代表這個
                    // 包物件正在被Server端處理且沒有響應回來
                    // (需要注意的是隻有連線時的ConnectRequest請求頭
                    // requestHeader才會為空,因此這裡的條件便是除了
                    // 新建連線、ping和auth型別的,其它都會被新增進來)
                    if (p.requestHeader != null
                        && p.requestHeader.getType() != OpCode.ping
                        && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            // 如果outgoingQueue為空或者尚未連線成功且本次的Packet包物件
            // 已經發送完畢則關閉OP_WRITE操作,因此傳送ConnectReuqest請
            // 求後便需要等待Server端的相應確認建立連線,不允許Client端
            // 這邊主動傳送NIO資訊
            if (outgoingQueue.isEmpty()) {
                disableWrite();
            } else if (!initialized && p != null && 
                    !p.bb.hasRemaining()) {
                disableWrite();
            } else {
                // 為了以防萬一開啟OP_WRITE操作
                enableWrite();
            }
        }
    }
}
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
        boolean clientTunneledAuthenticationInProgress) {
    synchronized (outgoingQueue) {
        // 判斷outgoingQueue是否為空
        if (outgoingQueue.isEmpty()) {
            return null;
        }
        // 兩種條件:
        // 如果第一個的ByteBuffer不為空
        // 如果傳入進來的clientTunneledAuthenticationInProgress為false
        // 引數為false說明認證尚未配置或者尚未完成
        if (outgoingQueue.getFirst().bb != null
            || !clientTunneledAuthenticationInProgress) {
            return outgoingQueue.getFirst();
        }
        // 跑到這裡說明認證已完成,需要遍歷outgoingQueue陣列,把連線的
        // 請求找到並放到佇列的第一個,以保證下次讀取會讀取到連線請求
        ListIterator<Packet> iter = outgoingQueue.listIterator();
        while (iter.hasNext()) {
            Packet p = iter.next();
            // 只有連線的requestHeader是空的,因此只需要判斷這個條件即可
            // 其它型別的包資料header肯定是不為空的
            if (p.requestHeader == null) {
                // 先刪除本包,隨後放到第一位
                iter.remove();
                outgoingQueue.add(0, p);
                return p;
            }
        }
        // 執行到這裡說明確實沒有包需要傳送
        return null;
    }
}

} ```

當Socket把請求資料已經序列化到ByteBuffer中的資料發出去後,Client端的第一步便已經完成。從這個流程中最關鍵的就是把OP_READ操作看成接收Server端的響應,而OP_WRITE則是Client主動發資料和Server端進行互動的操作,這樣在看程式碼理解時會更加輕鬆。

2.5 客戶端啟動流程圖

image.png

由於篇幅受限,剩下的兩個步驟(Server接收處理及響應和Client端接收Server端響應)在下篇文章分析,敬請期待。

參考文章

zookeeper3.7版本github原始碼註釋分析
## zk原始碼分析系列
Zookeeper原理和原始碼學習系列\ Zookeeper學習系列\ Zookeeper原始碼系列