ZooKeeper系列:zk中的watch
theme: channing-cyan
Watch就是監聽,觀察。
其實就是客戶端註冊watch,然後服務端發生節點資料變化的時候會觸發watch事件,接著回撥客戶端
建立工程和實現類
建立java 的maven工程,然後在pom中新增對應的zookeeper的maven資訊,其版本需要和安裝的zookeeper版本一致
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
與zookeeper叢集建立連線,其中使用了三個引數:
- connectString: 連線地址, 叢集中各個節點的地址和埠,使用逗號隔開。
- sessionTimeout: 超時時間,session的超時
- watcher: 監聽,在session中註冊監聽,此 watcher是session級別的
連線配置以及watch相關的使用參考下面的程式碼
java
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 3000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
System.out.println(watchedEvent.toString());
switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println("連線成功:connected");
countDownLatch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
ZooKeeper.States states = zk.getState();
switch (states) {
case CONNECTING:
System.out.println("CONNECTING......");
break;
case ASSOCIATING:
break;
case CONNECTED:
System.out.println("CONNECTED......");
break;
case CONNECTEDREADONLY:
break;
case CLOSED:
break;
case AUTH_FAILED:
break;
case NOT_CONNECTED:
break;
}
zk.create("/name","紀先生".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Stat stat = new Stat();
zk.getData("/name", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("getData watch..."+ watchedEvent.toString());
try {
zk.getData("/name",true,stat);
// zk.getData("/name",this,stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},stat);
Stat stat1 = zk.setData("/name","無先生".getBytes(),0);
zk.setData("/name","有先生".getBytes(),stat1.getVersion());
}
總共有兩種watch:
第一類:new Zookeeper時候傳入的watch,此watch是session級別的,跟path,node沒關係
第二類:getData的時候傳入的watch,此watch是和path,node相關的,當path發生變化的時候會觸發watch。
watch的特點:
- watch的註冊只發生在讀型別的呼叫,如getData,exists,getChildren
- watch的註冊是一次性的,如果想每次操作都有回撥,需要每次讀操作都要註冊一次
例如:
註冊watch除了直接new watch之外,還有下面兩種簡單的方式
方式一:程式碼中的zk.getData("/name",true,stat),其中的true引數就是代表註冊進去的watch是default watch,即new zk的那個watch。
方式二:zk.getData("/name",this,stat);,這裡的this代表的是父級的watch,就是它外層註冊的watch。
java
zk.getData("/name", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("getData watch..."+ watchedEvent.toString());
try {
zk.getData("/name",true,stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},stat);
如果不在watch中新增一個zk.getData("/name",true,stat),那麼只有在第一次setData的時候會觸發設定的watch,第二次setData的時候則不會(watch 是一次性的)
為什麼會使用CountDownLatch
仔細點話會發現程式碼中增加countDownLatch,這個是為了檢測zk建立成功之後才會返回連線成功(CONNECTED),zk的啟動是非同步的,new之後立即獲取zk的狀態是CONNECTING的,無法正常使用。
我是紀先生,用輸出倒逼輸入而持續學習,持續分享技術系列文章,以及全網值得收藏好文,歡迎關注公眾號,做一個持續成長的技術人。
- ChatGPT對比文心一言,孰強孰弱?
- 搗鼓一個小程式
- ZooKeeper系列:zk中的watch
- 自己動手給mac換電池,省1000多
- 八股文系列:Java基礎知識
- ZooKeeper系列:叢集安裝和session會話
- 八股文系列:Java集合容器
- MySQL進階系列:MySQL主從複製和原理
- 虛擬機器系列:Java記憶體模型
- Hello,World!
- 虛擬機器系列:記憶體溢位OOM以及解決思路
- 虛擬機器系列:虛擬機器效能監控基礎工具
- 虛擬機器系列:JVM中的垃圾收集器
- 新年新氣象 給你的banner換個面板吧
- 驗證mybatis批量插入能否一次能插入1萬條資料
- 記錄下經常忘記的位運算--與或非......
- 組織架構資料同步效率優化,百倍提速
- 本地復現不了問題,那就線上(非正式)debug
- MySQL進階系列:一條sql是怎麼執行的
- MySQL進階系列:多版本併發控制mvcc的實現