ZooKeeper系列:zk中的watch

語言: CN / TW / HK

theme: channing-cyan

Watch就是監聽,觀察。

其實就是客戶端註冊watch,然後服務端發生節點資料變化的時候會觸發watch事件,接著回撥客戶端

建立工程和實現類

建立java 的maven工程,然後在pom中新增對應的zookeeper的maven資訊,其版本需要和安裝的zookeeper版本一致

<dependency>    <groupId>org.apache.zookeeper</groupId>    <artifactId>zookeeper</artifactId>    <version>3.8.0</version> </dependency>

與zookeeper叢集建立連線,其中使用了三個引數:

  • connectString: 連線地址, 叢集中各個節點的地址和埠,使用逗號隔開。
  • sessionTimeout: 超時時間,session的超時
  • watcher: 監聽,在session中註冊監聽,此 watcher是session級別的

連線配置以及watch相關的使用參考下面的程式碼

java public static void main(String[] args) throws IOException, KeeperException, InterruptedException {        CountDownLatch countDownLatch = new CountDownLatch(1);        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 3000, new Watcher() {            @Override            public void process(WatchedEvent watchedEvent) {                Event.KeeperState state = watchedEvent.getState();                System.out.println(watchedEvent.toString()); ​                switch (state) {                    case Unknown:                        break;                    case Disconnected:                        break;                    case NoSyncConnected:                        break;                    case SyncConnected:                        System.out.println("連線成功:connected");                        countDownLatch.countDown();                        break;                    case AuthFailed:                        break;                    case ConnectedReadOnly:                        break;                    case SaslAuthenticated:                        break;                    case Expired:                        break;                    case Closed:                        break;               }           }       }); ​        try {            countDownLatch.await();       } catch (InterruptedException e) {            e.printStackTrace();       }        ZooKeeper.States states = zk.getState(); ​        switch (states) {            case CONNECTING:                System.out.println("CONNECTING......");                break;            case ASSOCIATING:                break;            case CONNECTED:                System.out.println("CONNECTED......");                break;            case CONNECTEDREADONLY:                break;            case CLOSED:                break;            case AUTH_FAILED:                break;            case NOT_CONNECTED:                break;       } ​        zk.create("/name","紀先生".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);        Stat stat = new Stat();        zk.getData("/name", new Watcher() {            @Override            public void process(WatchedEvent watchedEvent) {                System.out.println("getData watch..."+ watchedEvent.toString());                try {                    zk.getData("/name",true,stat); //                   zk.getData("/name",this,stat);               } catch (KeeperException e) {                    e.printStackTrace();               } catch (InterruptedException e) {                    e.printStackTrace();               }           }       },stat);        Stat stat1 = zk.setData("/name","無先生".getBytes(),0);        zk.setData("/name","有先生".getBytes(),stat1.getVersion()); ​   }

總共有兩種watch:

第一類:new Zookeeper時候傳入的watch,此watch是session級別的,跟path,node沒關係

第二類:getData的時候傳入的watch,此watch是和path,node相關的,當path發生變化的時候會觸發watch。

watch的特點:

  1. watch的註冊只發生在讀型別的呼叫,如getData,exists,getChildren
  2. watch的註冊是一次性的,如果想每次操作都有回撥,需要每次讀操作都要註冊一次

例如:

註冊watch除了直接new watch之外,還有下面兩種簡單的方式

方式一:程式碼中的zk.getData("/name",true,stat),其中的true引數就是代表註冊進去的watch是default watch,即new zk的那個watch。

方式二:zk.getData("/name",this,stat);,這裡的this代表的是父級的watch,就是它外層註冊的watch。

java zk.getData("/name", new Watcher() {            @Override            public void process(WatchedEvent watchedEvent) {                System.out.println("getData watch..."+ watchedEvent.toString());                try {                    zk.getData("/name",true,stat);               } catch (KeeperException e) {                    e.printStackTrace();               } catch (InterruptedException e) {                    e.printStackTrace();               }           }       },stat);

如果不在watch中新增一個zk.getData("/name",true,stat),那麼只有在第一次setData的時候會觸發設定的watch,第二次setData的時候則不會(watch 是一次性的)

為什麼會使用CountDownLatch

仔細點話會發現程式碼中增加countDownLatch,這個是為了檢測zk建立成功之後才會返回連線成功(CONNECTED),zk的啟動是非同步的,new之後立即獲取zk的狀態是CONNECTING的,無法正常使用。

我是紀先生,用輸出倒逼輸入而持續學習,持續分享技術系列文章,以及全網值得收藏好文,歡迎關注公眾號,做一個持續成長的技術人。