Zookeeper原始碼篇5-新建連線互動流程原始碼分析(單機Server服務端與Client客戶端)

語言: CN / TW / HK

歡迎大家關注 github.com/hsfxuebao ,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈

經過上一篇文章的流程圖,對於ZK新建連線的大致流程應該瞭解的差不多了,接下來開始進行詳細的程式碼分析,同樣是三步走,在進行閱讀時可以根據前面的流程圖一步一步跟著原始碼走,這樣閱讀起來會更加的清晰方便。

需要注意的是,ZK的很多程式碼構成都是通過內部類完成的,因此等下分析原始碼時可能方法名不會按原始碼的方式組排,只是簡單的展示原始碼的大致流程和作用。

上篇文章分析了client端發起連線的原始碼分析新建連線互動流程分析(單機Server服務端與Client客戶端),本篇繼續看後面的兩個步驟。

2. Server端接收處理響應資料

其實在第一步呼叫SocketChannel.connect()方法時,第二步就已經接收新建連線的通訊並且生成了session資訊了,但為了便於理解,我們還是把第二步當成依賴於第一步。後面在原始碼會詳細說明。

2.1 NIOServerCnxnFactory接收NIO請求

NIOServerCnxnFactory負責使用Selector多路複用選擇器來從多個Client端獲取Socket的新建和傳送資料,因此在互動流程中,此類為Server端的起始點,也是通過執行緒輪詢的方式不斷地獲取其它Socket傳送的請求資料。

這裡面有幾個內部類如下:

```java private abstract class AbstractSelectThread extends ZooKeeperThread;

// 功能:該執行緒主要是接收來自客戶端的連線請求,並完成三次握手,建立tcp連線 private class AcceptThread extends AbstractSelectThread;

/* * 該執行緒接管連線完成的socket,接收來自該socket的命令處理命令,把處理結果返回給客戶端。 * 在主流程中,會呼叫select()函式來監控socket是否有讀和寫事件,若有讀和寫事件會呼叫handleIO(key)函式對事件進行處理。 / public class SelectorThread extends AbstractSelectThread;

/* * IOWorkRequest是一個小的包裝類,允許執行doIO()呼叫 * 使用WorkerService在連線上執行。 / private class IOWorkRequest extends WorkerService.WorkRequest

/* * 此執行緒負責關閉過時的連線,以便未建立會話的連線已正確過期。 / private class ConnectionExpirerThread extends ZooKeeperThread ``` AcceptThread 執行緒負責接收來自客戶端的連線,並將SocketChannel放入到SelectorThread的acceptedQueue佇列中。

SelectorThread 執行緒負責將讀寫事件交給workerPool.schedule(workRequest);處理,然後IOWorkRequest.doWork()方法處理,交給NIOServerCnxn.doIO()處理。詳細程式碼如下:

```java public class NIOServerCnxnFactory extends ServerCnxnFactory {

// NIO的Server端SocketChannel,可被多個SocketChannel連線併發送資料
ServerSocketChannel ss;
// NIO的多路複用選擇器
final Selector selector = Selector.open();
// 儲存某一IP和其IP下的所有NIO連線物件
final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
        new HashMap<InetAddress, Set<NIOServerCnxn>>( );
// 同一個IP下預設的最大客戶端連線數
int maxClientCnxns = 60;

private abstract class AbstractSelectThread extends ZooKeeperThread {

    protected final Selector selector;

    public AbstractSelectThread(String name) throws IOException {
        super(name);
        // Allows the JVM to shutdown even if this thread is still running.
        setDaemon(true);
        this.selector = Selector.open();
    }

    public void wakeupSelector() {
        selector.wakeup();
    }
   ...
}
// 功能:該執行緒主要是接收來自客戶端的連線請求,並完成三次握手,建立tcp連線
private class AcceptThread extends AbstractSelectThread {

    private final ServerSocketChannel acceptSocket;
    private final SelectionKey acceptKey;
    private final RateLogger acceptErrorLogger = new RateLogger(LOG);
    private final Collection<SelectorThread> selectorThreads;
    private Iterator<SelectorThread> selectorIterator;
    private volatile boolean reconfiguring = false;

    public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {
        super("NIOServerCxnFactory.AcceptThread:" + addr);
        this.acceptSocket = ss;
        this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
        this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));
        selectorIterator = this.selectorThreads.iterator();
    }

    // 在run()函式中實現執行緒的主要邏輯。在run()函式中主要呼叫select()函式。
    public void run() {
        try {
            while (!stopped && !acceptSocket.socket().isClosed()) {
                try {
                    //呼叫select,將連線加入佇列中
                    select();
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    LOG.warn("Ignoring unexpected exception", e);
                }
            }
        } finally {
            closeSelector();
            // This will wake up the selector threads, and tell the
            // worker thread pool to begin shutdown.
            // 這將喚醒選擇器執行緒,並告訴工作執行緒池將開始關閉.
            if (!reconfiguring) {
                NIOServerCnxnFactory.this.stop();
            }
            LOG.info("accept thread exitted run method");
        }
    }

    public void setReconfiguring() {
        reconfiguring = true;
    }

    // 在select()函式中,會呼叫java的nio庫中的函式:
    // selector.select()對多個socket進行監控,看是否有讀、寫事件發生。若沒有讀、寫事件發生,該函式會一直阻塞。
    private void select() {
        try {
            selector.select();

            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (!stopped && selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                // 未獲取key即無讀寫事件發生,阻塞
                if (!key.isValid()) {
                    continue;
                }
                // 獲取到key,即有讀寫事件發生
                if (key.isAcceptable()) {
                    // todo
                    if (!doAccept()) {
                        // If unable to pull a new connection off the accept
                        // queue, pause accepting to give us time to free
                        // up file descriptors and so the accept thread
                        // doesn't spin in a tight loop.
                        // 如果無法從伺服器上拔出新連線,請接受
                        // 排隊,暫停接受,給我們自由時間
                        // 啟動檔案描述符,因此接受執行緒
                        // 不會在一個緊密的迴圈中旋轉。
                        pauseAccept(10);
                    }
                } else {
                    LOG.warn("Unexpected ops in accept select {}", key.readyOps());
                }
            }
        } catch (IOException e) {
            LOG.warn("Ignoring IOException while selecting", e);
        }
    }

    /**
     * 若有能夠accepted事件發生,則呼叫doAccept()函式進行處理。在函式doAccept中,會呼叫socket的accept函式,來完成和客戶端的三次握手,建立起tcp
     * 連線。然後把已經完成連線的socket,設定成非阻塞:sc.configureBlocking(false);
     * 接下來選擇一個selector執行緒,並把連線好的socket新增到該selector執行緒的acceptedQueue佇列中。
     * 可見,accepted佇列是一個阻塞佇列,新增到該佇列後,就需要selector執行緒來接管已連線socket的後續的訊息,所以需要喚醒selector佇列。在addAcceptedConnection
     * 把已連線socket新增到阻塞佇列中後,呼叫wakeupSelector();喚醒對應的selector執行緒。
     */
    private boolean doAccept() {
        // 阻塞
        boolean accepted = false;
        SocketChannel sc = null;
        try {
            //完成和客戶端的三次握手,建立起tcp連線
            sc = acceptSocket.accept();
            //非阻塞
            accepted = true;
            if (limitTotalNumberOfCnxns()) {
                throw new IOException("Too many connections max allowed is " + maxCnxns);
            }
            InetAddress ia = sc.socket().getInetAddress();
            // 從ipMap中獲取IP對應的連線物件,並判斷是否超過了
            // 當前IP最大連線數量
            int cnxncount = getClientCnxnCount(ia);

            if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
                // 如果超過則拋異常提示已超過並關閉Socket連線
                throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
            }

            LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());
            // 設定成非阻塞
            sc.configureBlocking(false);

            // Round-robin assign this connection to a selector thread
            // 迴圈將此連線分配給選擇器執行緒
            if (!selectorIterator.hasNext()) {
                selectorIterator = selectorThreads.iterator();
            }
            SelectorThread selectorThread = selectorIterator.next();
            //喚醒對應的selector執行緒
            if (!selectorThread.addAcceptedConnection(sc)) {
                throw new IOException("Unable to add connection to selector queue"
                                      + (stopped ? " (shutdown in progress)" : ""));
            }
            acceptErrorLogger.flush();
        } catch (IOException e) {
            // accept, maxClientCnxns, configureBlocking
            // 接受,maxClientCnxns,配置阻止
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
            acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
            fastCloseSock(sc);
        }
        return accepted;
    }

}
/**
 * 該執行緒接管連線完成的socket,接收來自該socket的命令處理命令,把處理結果返回給客戶端。
 * 在主流程中,會呼叫select()函式來監控socket是否有讀和寫事件,若有讀和寫事件會呼叫handleIO(key)函式對事件進行處理。
 */
public class SelectorThread extends AbstractSelectThread {

    private final int id;
    // 接收佇列,接收來自客戶端的連線請求
    private final Queue<SocketChannel> acceptedQueue;
    private final Queue<SelectionKey> updateQueue;

    public SelectorThread(int id) throws IOException {
        super("NIOServerCxnFactory.SelectorThread-" + id);
        this.id = id;
        acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
        updateQueue = new LinkedBlockingQueue<SelectionKey>();
    }
    public boolean addAcceptedConnection(SocketChannel accepted) {
        if (stopped || !acceptedQueue.offer(accepted)) {
            return false;
        }
        wakeupSelector();
        return true;
    }

    public void run() {
        try {
            while (!stopped) {
                try {
                    // todo
                    select();
                    processAcceptedConnections();
                    processInterestOpsUpdateRequests();
                } catch (RuntimeException e) {
                    LOG.warn("Ignoring unexpected runtime exception", e);
                } catch (Exception e) {
                    LOG.warn("Ignoring unexpected exception", e);
                }
            }

            // Close connections still pending on the selector. Any others
            // with in-flight work, let drain out of the work queue.
            for (SelectionKey key : selector.keys()) {
                NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
                if (cnxn.isSelectable()) {
                    cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                }
                cleanupSelectionKey(key);
            }
            SocketChannel accepted;
            while ((accepted = acceptedQueue.poll()) != null) {
                fastCloseSock(accepted);
            }
            updateQueue.clear();
        } finally {
            closeSelector();
            // This will wake up the accept thread and the other selector
            // threads, and tell the worker thread pool to begin shutdown.
            NIOServerCnxnFactory.this.stop();
            LOG.info("selector thread exitted run method");
        }
    }

    private void select() {
        try {
            selector.select();

            Set<SelectionKey> selected = selector.selectedKeys();
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
            // 隨機打亂已經獲取到的selectedList集合,至於為什麼要打亂
            // 估計是為了一定程度上保證各個Client端的請求都能被隨機處理
            Collections.shuffle(selectedList);
            Iterator<SelectionKey> selectedKeys = selectedList.iterator();
            // 獲取選擇key
            while (!stopped && selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selected.remove(key);
                //如果key無效
                if (!key.isValid()) {
                    cleanupSelectionKey(key);
                    continue;
                }
                //擁有key且有可讀或者可寫事件
                if (key.isReadable() || key.isWritable()) {
                    // todo
                    handleIO(key);
                } else {
                    LOG.warn("Unexpected ops in select {}", key.readyOps());
                }
            }
        } catch (IOException e) {
            LOG.warn("Ignoring IOException while selecting", e);
        }
    }

    /**
     * 在handleIO中,會啟動woker執行緒池中的一個worker來處理這個事件,
     * 處理事件的主類是ScheduledWorkRequest,最終會呼叫run函式中的workRequest.doWork();來處理請求。
     *
     * 計劃與關聯的連線上處理的I/O
     * 給定的SelectionKey。如果未使用工作執行緒池,
     * I/O直接由該執行緒執行
     */
    private void handleIO(SelectionKey key) {
        IOWorkRequest workRequest = new IOWorkRequest(this, key);
        NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();

        // Stop selecting this key while processing on its
        // connection
        //在處理其連線時停止選擇此鍵
        cnxn.disableSelectable();
        key.interestOps(0);
        touchCnxn(cnxn);
        workerPool.schedule(workRequest);
    }
    private void processAcceptedConnections() {
        SocketChannel accepted;
        while (!stopped && (accepted = acceptedQueue.poll()) != null) {
            SelectionKey key = null;
            try {
                // 將Socket 註冊到Selector中生成SelectionKey
                key = accepted.register(selector, SelectionKey.OP_READ);
                // 生成對應的NIO連線物件
                NIOServerCnxn cnxn = createConnection(accepted, key, this);
                // 將連線物件和SelectionKey進行繫結
                key.attach(cnxn);
                // 這裡面會儲存IP和連線物件集合,一個IP對應著系列
                // 的連線物件,因為一臺機器可能有多個連線物件
                addCnxn(cnxn);
            } catch (IOException e) {
                // register, createConnection
                cleanupSelectionKey(key);
                fastCloseSock(accepted);
            }
        }
    }
}
/**
 * IOWorkRequest是一個小的包裝類,允許執行doIO()呼叫
 * 使用WorkerService在連線上執行。
 */
private class IOWorkRequest extends WorkerService.WorkRequest {

    private final SelectorThread selectorThread;
    private final SelectionKey key;
    private final NIOServerCnxn cnxn;

    IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
        this.selectorThread = selectorThread;
        this.key = key;
        this.cnxn = (NIOServerCnxn) key.attachment();
    }

    // 在IOWorkRequest.doWork()中會判斷key的合法性,
    // 然後呼叫NIOServerCnxn.doIO(key)來處理事件
    public void doWork() throws InterruptedException {
        //判斷key的合法性
        if (!key.isValid()) {
            selectorThread.cleanupSelectionKey(key);
            return;
        }

        if (key.isReadable() || key.isWritable()) {
            // todo
            cnxn.doIO(key);

            // Check if we shutdown or doIO() closed this connection
            // 檢查是否關閉或doIO()是否關閉了此連線
            if (stopped) {
                cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                return;
            }
            if (!key.isValid()) {
                selectorThread.cleanupSelectionKey(key);
                return;
            }
            touchCnxn(cnxn);
        }

        // Mark this connection as once again ready for selection
        //將此連線再次標記為可供選擇
        cnxn.enableSelectable();
        // Push an update request on the queue to resume selecting
        // on the current set of interest ops, which may have changed
        // as a result of the I/O operations we just performed.
        // 在佇列上推送更新請求以繼續選擇
        // 在當前感興趣的操作集上,可能已更改
        // 作為我們剛才執行的I/O操作的結果。
        if (!selectorThread.addInterestOpsUpdateRequest(key)) {
            cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
        }
    }
}

} ```

2.2 連線物件NIOServerCnxn

這個代表著Client端在Server端的連線物件,新連線在Server端的表現便是一個NIOServerCnxn物件。並且這個物件會和對應的SelectionKey、Socket進行繫結。這個類裡面最重要的便是doIO()方法,在這個方法中會判斷讀寫事件,並根據相應的值進行處理,在新建連線流程中,只會分析讀事件。關鍵原始碼如下: ``` public class NIOServerCnxn extends ServerCnxn { // 這三個物件便不用做過多介紹了 NIOServerCnxnFactory factory; final SocketChannel sock; private final SelectionKey sk; // 用來讀取請求長度的buffer物件 ByteBuffer lenBuffer = ByteBuffer.allocate(4); // 實際接受請求長度的buffer物件 ByteBuffer incomingBuffer = lenBuffer; // 是否已經初始化,預設值為false boolean initialized; private final ZooKeeperServer zkServer; // 本連線對應的sessionId,剛開始sessionId不會有,只有當ZK的Server端處理了 // ConnectRequest之後才會被賦值 long sessionId; // 寫操作使用的ByteBuffer集合 LinkedBlockingQueue outgoingBuffers; public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory) throws IOException { ... // 前面的賦值可以忽略,當建立本物件時將會預設開啟讀事件 sk.interestOps(SelectionKey.OP_READ); } void doIO(SelectionKey k) throws InterruptedException { try { // 進行操作前需要判斷Socket是否被關閉 if (isSocketOpen() == false) { return; } // 判斷讀事件 if (k.isReadable()) { // 從Socket中先讀取資料,注意的是incomingBuffer容量只有4位元組 int rc = sock.read(incomingBuffer); // 讀取長度異常 if (rc < 0) { throw new EndOfStreamException(); } // 讀取完畢開始進行處理 if (incomingBuffer.remaining() == 0) { boolean isPayload; // 當這兩個完全相等說明已經是下一次連線了,新建時無需分析 if (incomingBuffer == lenBuffer) { incomingBuffer.flip(); isPayload = readLength(k); incomingBuffer.clear(); } else { isPayload = true; } if (isPayload) { // 讀取具體連線的地方 readPayload(); } else { return; } } } // 寫事件型別 if (k.isWritable()) { // 如果ByteBuffer集合不為空才進入,新建連線時如果響應沒有一次性 // 傳送完剩餘的會被放在outgoingBuffers集合中依次傳送出去 if (outgoingBuffers.size() > 0) { // 給傳送的ByteBuffer物件分配空間,大小為64 * 1024位元組 ByteBuffer directBuffer = factory.directBuffer; directBuffer.clear(); for (ByteBuffer b : outgoingBuffers) { // 這裡執行的操作是把已經發送過的資料剔除掉 // 留下未傳送的資料擷取下來重新發送 if (directBuffer.remaining() < b.remaining()) { b = (ByteBuffer) b.slice().limit( directBuffer.remaining()); } int p = b.position(); // 將未傳送的資料放入directBuffer中 directBuffer.put(b); // 更新outgoingBuffers中的ByteBuffer物件屬性,以便 // 後續使用 b.position(p); // 如果directBuffer的空間都被佔用光了,則直接停止從 // outgoingBuffers集合中獲取 if (directBuffer.remaining() == 0) { break; } } directBuffer.flip(); // 傳送directBuffer中的資料 int sent = sock.write(directBuffer); ByteBuffer bb; // 這部分的迴圈便是再次判斷前面使用過的物件 // 看這些物件是否已經發送完,根據position資訊判斷如果傳送完 // 則從outgoingBuffers集合中移除 while (outgoingBuffers.size() > 0) { bb = outgoingBuffers.peek(); if (bb == ServerCnxnFactory.closeConn) { throw new CloseRequestException(); } // 獲取ByteBuffer的剩餘資料 int left = bb.remaining() - sent; // 如果到此大於0,說明前面的資料已經填充滿 // 直接退出迴圈 if (left > 0) { bb.position(bb.position() + sent); break; } // 執行到這裡說明ByteBuffer物件已經發送完畢,可以更新 // 傳送狀態並從將其從outgoingBuffers中移除 packetSent(); sent -= bb.remaining(); outgoingBuffers.remove(); } } synchronized(this.factory){ if (outgoingBuffers.size() == 0) { // 如果outgoingBuffers已經全部被消化完了便把 // OP_WRITE操作關閉 if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) { throw new CloseRequestException(); } sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE)); } else { // 如果還剩餘一些沒有傳送完,則繼續開啟OP_WRITE操作 // 接著下次輪詢傳送 sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } } } // 異常處理忽略 ... } private void readPayload() throws IOException, InterruptedException { // 前面已經判斷過,這裡一定不會成立 if (incomingBuffer.remaining() != 0) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException(); } } if (incomingBuffer.remaining() == 0) { // 進行接收報文數量+1和更新Server端接收報文數量+1的操作 packetReceived(); incomingBuffer.flip(); // 第一次進來肯定是false if (!initialized) { // 因此這裡肯定會進入呼叫處理ConnectRequest的方法中 readConnectRequest(); } else { // 這裡是處理其它Request的方法,此次暫不分析,後續分析ping和 // 其它操作時再來分析此方法中的流程 readRequest(); } lenBuffer.clear(); // 處理完這次請求後再將incomingBuffer復原 incomingBuffer = lenBuffer; } } private void readConnectRequest() throws IOException, InterruptedException { if (zkServer == null) { throw new IOException("ZooKeeperServer not running"); } // 呼叫ZooKeeperServer的方法處理連線請求 zkServer.processConnectRequest(this, incomingBuffer); // 當前面執行完畢後說明已經初始化完成了 initialized = true; } }

```

2.3 單機執行的ZooKeeperServer

前面文章解釋過,這個類就是ZK的Server例項,每個ZK伺服器上對應著一個ZooKeeperServer例項,這裡面有諸多伺服器方面的屬性配置,但前面分析過,因此本次流程程式碼便不做過多的介紹了,有興趣的可以翻看前面的文章。

在Client端有ping心跳檢測間隔時間,在Server端有tickTime存活檢測時間,這兩個屬性代表的意思是不一樣的,Client端的ping心跳檢測間隔時間是輪詢隔一段時間後向Server端傳送ping請求,而Server端的tickTime間隔時間作用是每隔一段時間就判斷在Server端的Client連線物件是否已經死亡,如果已經過期死亡則將連線物件進行清除關閉。所以ping心跳檢測的意義是Client端告訴伺服器我還活著,tickTime意義是定期清除沒有告訴Server端還存活的連線。 ```java public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // 預設3S檢測一次客戶端存活情況 public static final int DEFAULT_TICK_TIME = 3000; // 實際設定的檢測存活時間間隔 protected int tickTime = DEFAULT_TICK_TIME; // Server端可接受的最小Client端sessionTimeout,如果未設定則值為tickTime2 protected int minSessionTimeout = -1; // Server端可接受的最大Client端sessionTimeout,如果未設定則值為tickTime20 protected int maxSessionTimeout = -1; // 處理客戶端請求RequestProcessor的第一個實現類物件 protected RequestProcessor firstProcessor; public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { BinaryInputArchive bia = BinaryInputArchive .getArchive(new ByteBufferInputStream(incomingBuffer)); // 反序列化ByteBuffer物件為ConnectRequest物件 ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); boolean readOnly = false; try { // 是否只可讀 readOnly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { ... } // 只有ReadOnlyZooKeeperServer型別的Server只接收readOnly為true的 if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) { ... throw new CloseRequestException(msg); } // 獲取的zxid需要小於Server端最大的zxid if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { ... throw new CloseRequestException(msg); } // 這段程式碼便是Server和Client端協商具體的sessionTimeout值 // 1、獲取客戶端傳來的sessionTimeout int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); // 2、先判斷sessionTimeout是否小於Server端可接受的最小值 // 如果小於Server端可接受最小值則設定成Server端的最小sessionTimeout int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } // 3、再判斷sessionTimeout是否大於Server端可接受的最大值 // 如果大於Server端可接受最大值則設定成Server端的最大sessionTimeout int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } // 最後把滿足協商範圍的sessionTimeout設定到Client連線物件中 cnxn.setSessionTimeout(sessionTimeout); // 設定該連線物件不再從Client端接收資料 cnxn.disableRecv(); long sessionId = connReq.getSessionId(); // 第一次連線不手動設定sessionId都是0 if (sessionId != 0) { // 如果不是0則需要關閉原來的session並且重新開啟sessionId // 這種情況不常見,只需要知道處理的程式碼邏輯在這裡便行,暫不詳細分析 long clientSessionId = connReq.getSessionId(); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { // 開始建立新的session資訊 createSession(cnxn, passwd, sessionTimeout); } } long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { // 根據失效時間建立一個新的session資訊並返回唯一ID long sessionId = sessionTracker.createSession(timeout); // 設定失效時間和sessionId Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); // 呼叫該方法使用剛剛獲取到的屬性去生成Request請求 submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId; } private void submitRequest(ServerCnxn cnxn, long sessionId, int type, int xid, ByteBuffer bb, List authInfo) { // 根據引數生成Request物件,並呼叫submitRequest()方法開始使用 // RequestProcessor鏈對Request進行處理 Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); submitRequest(si); } public void submitRequest(Request si) { // 這個方法功能很簡單: // 1、判斷Server端是否初始化完成,如果未完成則一直持續等待 // 2、在呼叫RequestProcessor鏈前先更新session在Server端的過期時間 // 3、呼叫firstProcessor物件的processRequest方法開始處理請求 if (firstProcessor == null) { synchronized (this) { try { // 一直輪詢直到Server端的各種元件初始化完成 while (state == State.INITIAL) { wait(1000); } } ... // 如果未初始化成功則丟擲異常 if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } // todo 將請求放到requestThrottler的submittedRequests佇列中 requestThrottler.submitRequest(si); } public void submitRequest(Request request) { if (stopping) { LOG.debug("Shutdown in progress. Request cannot be processed"); dropRequest(request); } else { request.requestThrottleQueueTime = Time.currentElapsedTime(); // todo submittedRequests.add(request); } } 將請求放到requestThrottler的submittedRequests佇列中,然後在requestThrottler 的run()方法中呼叫zks.submitRequestNow(request);

public void submitRequestNow(Request si) {
    .. 
    try {
        // 更新session的過期時間
        touch(si.cnxn);
        // 校驗請求型別是否有效
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            setLocalSessionFlag(si);
            // todo 開始呼叫firstProcessor物件的processRequest()方法處理請求
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type {}", si.type);
            // Update request accounting/throttling limits
            requestFinished(si);
            // 如果處理型別校驗不通過則傳送無法處理請求並關閉連線
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } 
    ...
}
void touch(ServerCnxn cnxn) throws MissingSessionException {
    if (cnxn == null) {
        return;
    }
    long id = cnxn.getSessionId();
    int to = cnxn.getSessionTimeout();
    // 獲取sessionId和sessionTimeout屬性呼叫sessionTracker去更新session
    // 在Server端的過期時間
    if (!sessionTracker.touchSession(id, to)) {
        throw new MissingSessionException();
    }
}

} ```

2.4 session追蹤類SessionTracker

取名為SessionTracker,實際上這個類的功能就是維護session生命週期,主要進行session過期判斷和更新session狀態的操作,判斷session過期還是放到後面分析ping流程再看吧,新建連線時就看其如何更新session狀態。

```java public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker { // 儲存sessionId和對應的Session物件 HashMap sessionsById;

HashMap<Long, SessionSet> sessionSets;
// key為sessionId,value為這個session的過期時間
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
// 下一次新建session時的id
long nextSessionId = 0;

public long createSession(int sessionTimeout) {
    long sessionId = nextSessionId.getAndIncrement();
    // 在使用RequestProcessor處理請求前會呼叫該方法為客戶端建立一個session
    trackSession(sessionId, sessionTimeout);
    return sessionId;
}

@Override
public synchronized boolean trackSession(long id, int sessionTimeout) {
    boolean added = false;

    // 如果沒有儲存對應的Session物件則建立一個並新增
    SessionImpl session = sessionsById.get(id);
    if (session == null) {
        session = new SessionImpl(id, sessionTimeout);
    }

    // findbugs2.0.3 complains about get after put.
    // long term strategy would be use computeIfAbsent after JDK 1.8
    SessionImpl existedSession = sessionsById.putIfAbsent(id, session);

    if (existedSession != null) {
        session = existedSession;
    } else {
        added = true;
        LOG.debug("Adding session 0x{}", Long.toHexString(id));
    }

    if (LOG.isTraceEnabled()) {
        String actionStr = added ? "Adding" : "Existing";
        ZooTrace.logTraceMessage(
            LOG,
            ZooTrace.SESSION_TRACE_MASK,
            "SessionTrackerImpl --- " + actionStr
            + " session 0x" + Long.toHexString(id) + " " + sessionTimeout);
    }
    // 新增完session後更新session的過期時間
    updateSessionExpiry(session, sessionTimeout);
    return added;

private void updateSessionExpiry(SessionImpl s, int timeout) {
    logTraceTouchSession(s.sessionId, timeout, "");
    sessionExpiryQueue.update(s, timeout);
}

// 當client與server有互動時(連線請求/讀寫操作/心跳),該方法就會被呼叫
// 當zk server啟動時會將磁碟中的session恢復到記憶體,也會呼叫該方法
// 該方法在做的是會話換桶操作
public Long update(E elem, int timeout) {
    // elemMap集合的key為session,value為該session的過期時間,
    // 即該session當前所在的會話桶id
    Long prevExpiryTime = elemMap.get(elem);
    long now = Time.currentElapsedTime();
    // 計算本次互動應該將會話放入到哪個會話桶
    Long newExpiryTime = roundToNextInterval(now + timeout);

    // 若之前所在會話桶id與本次互動計算的會話桶id相同,
    // 則無需換桶,即什麼也不用做
    if (newExpiryTime.equals(prevExpiryTime)) {
        // No change, so nothing to update
        return null;
    }

    // ---------- 程式碼能走到這裡,說明需要換桶了。 --------------
    // 換桶由兩步操作完成:將會話放入到新桶;將會話從老桶中清除

    // First add the elem to the new expiry time bucket in expiryMap.
    // 從會話桶集合中獲取當前的會話桶,若為null,則建立一個新的會話桶
    Set<E> set = expiryMap.get(newExpiryTime);
    if (set == null) {
        // Construct a ConcurrentHashSet using a ConcurrentHashMap
        // 建立會話桶set
        set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
        // Put the new set in the map, but only if another thread
        // hasn't beaten us to it
        // 將新建的會話桶放入到會話桶集合
        Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
        if (existingSet != null) {
            set = existingSet;
        }
    }
    // 將會話放入到會話桶
    set.add(elem);

    // Map the elem to the new expiry time. If a different previous
    // mapping was present, clean up the previous expiry bucket.
    // 將會話與會話桶id的對應關係放入到elemMap,並獲取到該會話之前所在的會話桶id
    prevExpiryTime = elemMap.put(elem, newExpiryTime);
    // 若當前會話桶id與之前會話桶id不相同,說明需要換桶。
    // 而前面已經將會話放到了新的會話桶,所以這裡要將會話從老桶中清除
    if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
        // 獲取到之前的會話桶
        Set<E> prevSet = expiryMap.get(prevExpiryTime);
        if (prevSet != null) {
            // 將會話從老會話桶中清除
            prevSet.remove(elem);
        }
    }
    // 返回當前互動引發的會話所在的會話桶id,
    // 即當前會話的真正過期時間點
    return newExpiryTime;
}

} ```

2.5 RequestProcessor請求處理鏈

前面介紹過,在單機執行時RequestProcessor處理鏈只有三個:PrepRequestProcessor、SyncRequestProcessor和FinalRequestProcessor,其中前兩個是執行緒物件,最後一個是普通的物件,至於原因前面的文章介紹過。接下來的三個RequestProcessor大致作用不做分析,有興趣可以看下以前的文章。

2.5.1 PrepRequestProcessor

```java public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { // 本RequestProcessor中用來暫時儲存需要處理的Request,輪詢獲取請求處理 LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue(); // 本RequestProcessor的下一個RequestProcessor物件 RequestProcessor nextProcessor; ZooKeeperServer zks; @Override public void processRequest(Request request) { // RequestProcessor的實現方法,由於內部使用輪詢方式從submittedRequests // 集合獲取資料,因此在這裡直接把Request新增到集合中即可 submittedRequests.add(request); } @Override public void run() { try { while (true) { // 輪詢從submittedRequests集合中獲取Request物件 Request request = submittedRequests.take(); // 如果requestOfDeath代表ZK已經關閉,因此退出迴圈 if (Request.requestOfDeath == request) { break; } // 開始處理正常的Request pRequest(request); } }... } protected void pRequest(Request request) throws RequestProcessorException {

    request.setHdr(null);
    request.setTxn(null);

    if (!request.isThrottled()) {
        // todo
      pRequestHelper(request);
    }  
    request.zxid = zks.getZxid();
    // 呼叫下個RequestProcessor來處理Request
    nextProcessor.processRequest(request);
}

private void pRequestHelper(Request request) throws RequestProcessorException {
    try {
        switch (request.type) {
        ...
        case OpCode.createSession:
        case OpCode.closeSession:
            if (!request.isLocalSession()) {
                // 直接處理事務
                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
            }
            break;
            ...
       }
   }
}

protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
    if (request.getHdr() == null) {
        // 為請求建立事務頭TxnHeader物件
        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                Time.currentWallTime(), type));
    }

    switch (type) {
    ...
    // 建立session
    case OpCode.createSession:
        request.request.rewind();
        // 此時的to實際上就是sessionTimeout
        int to = request.request.getInt();
        // 使用sessionTimeout建立CreateSessionTxn物件
        request.setTxn(new CreateSessionTxn(to));
        request.request.rewind();
        // only add the global session tracker but not to ZKDb
        // 根據sessionid和sessionTimeout再次新增session資訊
        zks.sessionTracker.trackSession(request.sessionId, to);
        zks.setOwner(request.sessionId, request.getOwner());
        break;
    ...
    }
}

protected void pRequest2Txn(int type, long zxid, Request request, 
        Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException{
    // 為請求建立事務頭TxnHeader物件
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                zks.getTime(), type);
    switch (type) {
        // 無關的case情況忽略
        ...
        case OpCode.createSession:
            request.request.rewind();
            // 此時的to實際上就是sessionTimeout
            int to = request.request.getInt();
            // 使用sessionTimeout建立CreateSessionTxn物件
            request.txn = new CreateSessionTxn(to);
            request.request.rewind();
            // 根據sessionid和sessionTimeout再次新增session資訊
            zks.sessionTracker.addSession(request.sessionId, to);
            zks.setOwner(request.sessionId, request.getOwner());
            break;
        ...
}

} ```

2.5.2 SyncRequestProcessor

```java public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { // 本RequestProcessor中用來暫時儲存需要處理的Request,輪詢獲取請求處理 private final LinkedBlockingQueue queuedRequests = new LinkedBlockingQueue(); // 儲存的是已經被寫入磁碟但是待重新整理的事務 private final LinkedList toFlush = new LinkedList(); // 本RequestProcessor的下一個RequestProcessor物件 private final RequestProcessor nextProcessor; // Server端快照的數量 private static int snapCount = ZooKeeperServer.getSnapCount(); // 在回滾前的log數量,隨機生成的 private static int randRoll; public void processRequest(Request request) { // 類似於PrepRequestProcessor,內部使用輪詢方式從submittedRequests // 集合獲取資料,因此在這裡直接把Request新增到集合中即可 queuedRequests.add(request); } @Override public void run() { try { int logCount = 0; // 避免服務都在同一時間獲取快照snapshot,這裡面設定的是randRoll屬性 setRandRoll(r.nextInt(snapCount/2)); while (true) {

        long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
        // 從queuedRequests獲取Request
        Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);

            // 如果已經結束則退出迴圈
            if (si == requestOfDeath) {
                break;
            }
            if (si != null) {
                // 將Request寫入到log中
                if (zks.getZKDatabase().append(si)) {
                    logCount++;
                    // 如果日誌的數量大於某個臨界點,則生成一次快照
                    if (logCount > (snapCount / 2 + randRoll)) {
                        // 途中會非同步生成快照,過程忽略,操作完之後
                        // logCount 歸零
                        ...
                        logCount = 0;
                    }
                } else if (toFlush.isEmpty()) {
                    // 如果所有的事務都處理完則使用nextProcessor
                    // 開始進行下一步處理
                    if (nextProcessor != null) {
                        // 進行處理
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                    }
                    continue;
                }
                // 如果前面兩個條件都不滿足,則把Request新增到待重新整理的
                // 事務集合中
                toFlush.add(si);
                if (toFlush.size() > 1000) {
                    // 當待刷事務到達了1000個,則把集合中的所有事務全都
                    // 刷掉並使用nextProcessor依次進行處理
                    flush(toFlush);
                }
            }
        }
    } ...
}

} ```

2.5.2 FinalRequestProcessor

```java public class FinalRequestProcessor implements RequestProcessor { ZooKeeperServer zks;

public void processRequest(Request request) {

    // 直接開始處理Request請求
    ProcessTxnResult rc = null;
    if (!request.isThrottled()) {
      rc = applyRequest(request);
    }
    // 如果執行到這裡連線物件還為空則直接退出
    if (request.cnxn == null) {
        return;
    }
    ServerCnxn cnxn = request.cnxn;

    long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();

    String lastOp = "NA";
    // Notify ZooKeeperServer that the request has finished so that it can
    // update any request accounting/throttling limits
    // 執行中的數量減一
    zks.decInProcess();
    zks.requestFinished(request);
    Code err = Code.OK;
    Record rsp = null;
    String path = null;
    int responseSize = 0;
    try {
        // 如果發生了異常則直接丟擲
        if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
            AuditHelper.addAuditLog(request, rc, true);

            // 如果是單個的操作發生了異常丟擲
            if (request.getException() != null) {
                throw request.getException();
            } else {
                throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
            }
        }

        ...
        // 開始根據Request的操作型別進行相應的處理
        switch (request.type) {
        ...
        case OpCode.createSession: {
            // 最後的操作型別
            lastOp = "SESS";
            // 更新狀態
            updateStats(request, lastOp, lastZxid);
            // 最後呼叫這個方法來完成session的初始化以及響應
            zks.finishSessionInit(request.cnxn, true);
            // 直接退出方法
            return;
        }
       }
   }
}

}

2.6 ZooKeeperServer新建連線生成響應物件 又再次回到了ZooKeeperServer類中,這裡面執行了Server端針對新建連線的最後響應,其實我也搞不懂為什麼要把新建連線單獨的抽出來放到ZooKeeperServer類中來,或許唯一能解釋的便是方便處理已存在session重新建立這個流程。

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public void finishSessionInit(ServerCnxn cnxn, boolean valid) { // 使用JMX監控註冊連線物件cnxn try { // valid指的是是否成功建立session資訊 if (valid) { serverCnxnFactory.registerConnection(cnxn); } }... try { // 如果valid為true,則使用cnxn連線物件的sessionTimemout,否則為0 // 如果valid為true,則使用cnxn連線物件的ssessionId,否則為0 // 如果valid為true,則使用cnxn連線物件的ssessionId生成密碼,否則空 ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); // 生成響應的位元組物件 ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); if (!cnxn.isOldClient) { bos.writeBool( this instanceof ReadOnlyZooKeeperServer, "readOnly"); } baos.close(); // 根據剛剛生成的位元組陣列申城ByteBuffer ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); // 傳送ByteBuffer物件內容 cnxn.sendBuffer(bb); // 如果valid失效則關掉連線
if (!valid) { cnxn.sendBuffer(ServerCnxnFactory.closeConn); } else { // 如果成功則確保能讀取到Client端傳送過來的資料 cnxn.enableRecv(); } } catch (Exception e) { cnxn.close(); } } } 2.7 NIOServerCnxn傳送新建連線響應 執行到這一步已經到了新建連線的尾聲了,這一步只有傳送ByteBuffer物件的資料,其它的操作相對而言並不是很重要。

public class NIOServerCnxn extends ServerCnxn { public void sendBuffer(ByteBuffer bb) { try { // 只有非關閉連線的操作才能使用Socket傳送資料 if (bb != ServerCnxnFactory.closeConn) { // 確保SelectionKey的OP_WRITE沒有被開啟,以確保等下wake喚醒 // Selector可以進行重試 if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) { try { // 傳送快取資料 sock.write(bb); } catch (IOException e) { } } if (bb.remaining() == 0) { // 如果快取資料傳送完畢則更新ZK的Server狀態 packetSent(); return; } } // 如果跑到這裡說明ByteBuffer並未全部發送,因此需要喚醒Selector // 把剩餘的ByteBuffer資料傳送出去 synchronized(this.factory){ sk.selector().wakeup(); // 新增到outgoingBuffers集合中交給doIO()方法裡面的write方法 // 型別處理,該邏輯在前面已經分析過了,可以直接回頭看 outgoingBuffers.add(bb); if (sk.isValid()) { // 將OP_WRITE開啟 sk.interestOps( sk.interestOps() | SelectionKey.OP_WRITE); } } } } // hdr和txn都是和連線相關的物件,裡面的方法執行的操作為新增 // session資訊,到這裡已經是新建連線的第三次呼叫新增session資訊 // 當然這裡面還會呼叫DataTree.processTxn()方法,只是不會執行 // 很重要的邏輯程式碼 public ProcessTxnResult processTxn(Request request) { TxnHeader hdr = request.getHdr(); processTxnForSessionEvents(request, hdr, request.getTxn());

    final boolean writeRequest = (hdr != null);
    final boolean quorumRequest = request.isQuorum();

    // return fast w/o synchronization when we get a read
    if (!writeRequest && !quorumRequest) {
        return new ProcessTxnResult();
    }
    synchronized (outstandingChanges) {
        //
        ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());

        // request.hdr is set for write requests, which are the only ones
        // that add to outstandingChanges.
        if (writeRequest) {
            long zxid = hdr.getZxid();
            // 新建連線流程outstandingChanges是空的,因此這裡的迴圈邏輯暫不分析
            while (!outstandingChanges.isEmpty()
                    && outstandingChanges.peek().zxid <= zxid) {
                ChangeRecord cr = outstandingChanges.remove();
                ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                if (cr.zxid < zxid) {
                    LOG.warn(
                        "Zxid outstanding 0x{} is less than current 0x{}",
                        Long.toHexString(cr.zxid),
                        Long.toHexString(zxid));
                }
                if (outstandingChangesForPath.get(cr.path) == cr) {
                    outstandingChangesForPath.remove(cr.path);
                }
            }
        }

        // do not add non quorum packets to the queue.
        if (quorumRequest) {
            getZKDatabase().addCommittedProposal(request);
        }
        return rc;
    }
}

} ```

3. Client端接收響應

當第二步走完後便進入到了第三步Client接收Server端響應並呼叫監聽器的步驟了。

3.1 SendThread接收通知

前面已經說了,SendThread負責傳送和接收包資料,當Server端傳送了新建連線響應後該類就會接收並進行相應的處理。本次分析只會分析經過的邏輯部分,其它的邏輯不做分析。 java class SendThread extends ZooKeeperThread { @Override public void run() { ... while (state.isAlive()) { try { ... // 還是老地方,呼叫doTransport()方法處理NIO的事件 clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } } ... } }

3.2 ClientCnxnSocketNIO處理讀事件

這次進入到該類處理的便是OP_READ型別的NIO事件。 java public class ClientCnxnSocketNIO extends ClientCnxnSocket { @Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { // 老邏輯,不再分析 selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // 針對客戶端的響應均會進入到該方法中 doIO(pendingQueue, outgoingQueue, cnxn); } } // 後面略 ... } void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } // 開始處理讀事件 if (sockKey.isReadable()) { // 從Socket中讀取資料 int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException(); } // incomingBuffer已經讀取完畢 if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { // 新建連線將會跑到這裡來,因為此時Client端的initialized // 還是為false,尚未初始化完成 // 開始讀取連線響應結果 readConnectResult(); // 開啟Socket的OP_READ操作 enableRead(); // 檢視outgoingQueue佇列是否有可讀包資料 if (findSendablePacket(outgoingQueue, cnxn.sendThread .clientTunneledAuthenticationInProgress())!=null){ // 如果有的話則開啟OP_WRITE操作,準備下次輪詢時處理 // 寫事件 enableWrite(); } // 設定initialized屬性初始化完成並更新lastHeard屬性 lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { // 這裡是當新建連線成功後普通的操作響應處理邏輯 sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } // 後面的處理寫事件忽略 } void readConnectResult() throws IOException { // 使用讀取到的ByteBuffer物件反序列化得到ConnectResponse響應 ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ConnectResponse conRsp = new ConnectResponse(); conRsp.deserialize(bbia, "connect"); boolean isRO = false; try { // 讀取readOnly屬性 isRO = bbia.readBool("readOnly"); }... // 開始進行連線成功的操作 this.sessionId = conRsp.getSessionId(); sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); } }

3.3 ClientCnxn處理連線成功

執行到這裡基本上就已經算成功了,接下來的事情便是觸發ZK的監聽器。 java public class ClientCnxn { void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { // _negotiatedSessionTimeout便是Client端和Server端互相協商獲得的 // sessionTimeout過期時間 negotiatedSessionTimeout = _negotiatedSessionTimeout; // 時間小於等於0說明連線失敗了 if (negotiatedSessionTimeout <= 0) { state = States.CLOSED; // 傳送ZK過期事件 eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); // 並且傳送停止服務事件 eventThread.queueEventOfDeath(); throw new SessionExpiredException(warnInfo); } // 接下來便是設值了,具體的值在這裡都可以看到 readTimeout = negotiatedSessionTimeout * 2 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; // 根據Server端傳來的屬性設值狀態 state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; seenRwServerBefore |= !isRO; // 確定等下要傳送的事件型別 KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; // 使用EventThread執行緒物件釋出監聽事件 eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, eventState, null)); } }

3.4 EventThread監聽事件

前面說過SendThread負責和ZK的Server端進行互動,完成傳送資料包和接收響應的任務,而EventThread則是根據SendThread接收到響應型別產生的事件型別進行輪詢處理。也就是說SendThread負責和Server端對接,EventThread則是負責和SendThread對接,處理Client自己產生的ZK事件。 java class EventThread extends ZooKeeperThread { // 將要處理的ZK事件集合 private final LinkedBlockingQueue<Object> waitingEvents; // 客戶端的Watcher管理類 private final ClientWatchManager watcher; public void queueEvent(WatchedEvent event) { // SendThread就是呼叫這個方法將對應的ZK事件傳入進來開始ZK事件的生命週期 // 如果session狀態和當前一樣且事件型別沒有則直接退出,無需處理 if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // 使用傳入的ZK事件和ClientWatchManager生成事件和監聽器的繫結物件 WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // 將事件和監聽器的繫結物件新增到waitingEvents集合中,這個集合型別只 // 會是WatcherSetEventPair或者Packet waitingEvents.add(pair); } @Override public void run() { try { isRunning = true; while (true) { // 輪詢waitingEvents集合,取出其中的事件物件 Object event = waitingEvents.take(); // eventOfDeath為關閉事件 if (event == eventOfDeath) { wasKilled = true; } else { // 不是關閉事件則開始處理事件 processEvent(event); } if (wasKilled) { synchronized (waitingEvents) { // 如果是關閉事件則會等waitingEvents全部處理之後再把 // EventThread設定為停止執行且退出迴圈 if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } }// 異常處理忽略 ... } private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // 如果是正常的WatcherSetEventPair型別則直接取出裡面所有的 // 監聽器傳入繫結的事件依次執行,這個步驟便是對應我們自己開發 // 的Watcher回撥 WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); }... } }// 後面是針對Packet事件型別進行的處理,回撥型別是非同步回撥 ... }// 異常處理忽略 ... } } 執行到這裡新建連線的流程已經執行完畢了,接下來看下ClientWatchManager是如何將ZK的事件和Watcher進行繫結的。

3.5 ClientWatchManager監聽器管理類

這個類會管理四種邏輯型別的監聽器,至於具體的型別可以看以前的文章。接下來簡單的看下其materialize方法的實現。

java private static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>(); private volatile Watcher defaultWatcher; @Override public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { // 將要返回的監聽器集合 Set<Watcher> result = new HashSet<Watcher>(); switch (type) { case None: // 新建連線相關的事件型別都是None,不管是連線成功還是連線失敗超時 // 將預設監聽器defaultWatcher新增到result中,這也就是為什麼在 // 新建ZooKeeper連線時傳入Watcher新建連線相關的事件這個都會收到 result.add(defaultWatcher); // 判斷是否使用完之後刪除,需要開關開啟且ZK狀態不是SyncConnected boolean clear = ClientCnxn.getDisableAutoResetWatch() && state != Watcher.Event.KeeperState.SyncConnected; // 將dataWatches中的監聽器新增到result集合中 synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { result.addAll(ws); } if (clear) { // 如果需要刪除則把快取的全刪了 dataWatches.clear(); } } // 後面的其它兩種都是同樣的操作,略過 return result; case NodeDataChanged: case NodeCreated: // 節點變更型別事件,只有dataWatches和existWatches會參與 synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; case NodeChildrenChanged: // 子節點變更事件,只有childWatches參與 synchronized (childWatches) { addTo(childWatches.remove(clientPath), result); } break; case NodeDeleted: // 節點被刪除三種類型都會受到影響,操作方式和前面類似直接略過 synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } ... break; default: throw new RuntimeException(msg); } return result; } }

不得不說這是一個龐大的工程量,閱讀完ZK的原始碼後對平時使用以及某些配置都有更加深刻的理解了,只是對於ZK的ByteBuffer空間大小的4位元組分配還有些犯迷糊。後續再補回來。

能耐心看到這裡的想必也是決定了把ZK琢磨透的秀兒吧。

參考文章

zookeeper3.7版本github原始碼註釋分析
## zk原始碼分析系列
Zookeeper原理和原始碼學習系列\ Zookeeper學習系列\ Zookeeper原始碼系列