分散式前修課:Zookeeper鎖實現方式

語言: CN / TW / HK

前言

聊完MySQL和Redis,我們接下來在聊一聊Zookeeper。相信大家都已經發現了,這些都是我們在開發過程非常常用的技術。搞定他們,一切難題都不在話下。

Zookeeper,盤它

官網 是我們學習某一種技術框架的第一手資料,通過官網我們能挖掘到該框架的最新動態

What Is Zookeeper

Zookeeper是一款主要解決分散式協調的服務框架,可以用來維護配置資訊、命名、提供分散式同步和服務提供等功能。Zookeeper基於ZAB【ZooKeeper 原子廣播】協議,支援高可用。

圖片來源自 官方介紹

Zookeeper的設計

設計目標

Zookeeper的設計很簡單,其目的就是為了:

  • 減輕分散式應用程式實現協調服務的壓力,允許分散式程序通過共享的分層名稱空間相互協調

而在Zookeeper中的檔案儲存可以稱為: znodes ,類似於Linux下的目錄和檔案;而不同的一點是: ZooKeeper 資料儲存在記憶體中 。這樣也就意味著Zookeeper自身可以實現實現高吞吐量和低延遲

名稱空間設計

圖片來源自 官方介紹

Zookeeper中名稱全部由斜槓【/】 分隔的一系列路徑元素,名稱空間中的每個節點都由路徑標識。而每個節點都可以擁有與其關聯的資料以及子節點。這就像擁有一個允許檔案也成為目錄的檔案系統

專業點來說Zookeeper中的每一個節點都可以稱為 znode , 主要分為兩類:

  • 持久節點:【節點只要建立就存在,除非手動刪除】
  • 臨時節點:【只要建立znode的會話處於活動狀態,那麼當前節點就會存在;當會話結束,臨時節點自動刪除】

有序節點是在臨時節點和持久節點的基礎上建立的時候後面跟上順序,本質上沒有發生很大的變化

Zookeeper提供了監聽/回撥的機制,當客戶端對 znode 進行操作之後,會觸發watch機制,客戶端受到 znode 已經改變的資料包。

從開發角度來看,這種屬於 Reactor程式設計模型 ,純非同步程式設計

Netty就是這種程式設計模型的典型案例

穩定大局

對Zookeeper有一點了解之後,我們就要開始使用它了,我們使用它的目的是為了實現分散式鎖。那麼我們先來搞定基礎環境

我們這裡先按照單機環境來做,後面會給出叢集環境的配置方式

需要注意的是: 2N + 1原則

Zookeeper叢集最少需要三臺伺服器,並且強烈建議使用 奇數臺 伺服器。如果您只有兩臺伺服器,那麼您會遇到這樣的情況:如果其中一臺出現故障,則沒有足夠的機器來形成多數法定人數。兩臺伺服器本質上 不如 一臺伺服器穩定,因為有兩個單點故障

環境規劃

我們這裡使用的Zookeeper版本: 3.6.2

Zookeeper強依賴於JDK,並且需要安裝JDk1.8之上的版本

node ip port
zookeeper 192.168.10.200 2181

單機環境

環境規劃完成之後,接下來就看我操作吧。

 wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
 # 解壓
 tar xf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/
 ​
 # 進入到/usr/local下,改個名字
 mv apache-zookeeper-3.6.2-bin/ zookeeper-3.6.2

其實到這裡環境就已經安裝完成了,下面就是針對Zookeeper的配置

 # 配置檔案全部存放在conf下,並且我們需要將模板配置換成`zoo.cfg`,不然無法生效
 cd /usr/local/zookeeper-3.6.2/conf && cp ./zoo_sample.cfg ./zoo.cfg
 ​
 vim zoo.cfg
  # 預設在tmp下,但是tmp屬於系統臨時檔案目錄,我們最好進行修改
  dataDir=/var/data/bigdata/zookeeper

按照 zoo.cfg 中的配置,我們也只需要改動 dataDir 的目錄就可以了,其他的暫時預設就好

關於Zookeeper更多的配置,在官網《 配置引數 》中找到

環境變數配置

 # 編輯配置
 vim /etc/profile
  export ZOOKEEPER_HOME=/usr/local/zookeeper-3.6.2
  export JAVA_HOME=/usr/java/jdk1.8.0_221-amd64
  export PATH=$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH
  
 # 使其生效
 source /etc/profile

下面就開始啟動階段了

 # 以下為啟動的全部命令
 zkServer.sh [--config <conf-dir>] {start|start-foreground|stop|version|restart|status|print-cmd}
 ​
 # 啟動:這裡已經將Zookeeper加入到了環境變數中
 zkServer.sh start
 # 展示啟動狀態
 zkServer.sh status

叢集配置

叢集配置環境下,需要改變兩個地方:

第一步:在 zoo.cfg 配置檔案中新增叢集節點的配置

 server.1=192.168.10.200:2181:2888:3888
 server.2=192.168.10.201:2181:2888:3888
 server.3=192.168.10.202:2181:2888:3888

第二步:在各自節點的 $dataDir 目錄下新增 myid 檔案,內容對應上面配置的序號

 echo 1 > myid
 echo 2 > myid
 echo 3 > myid

記得要和zoo.cfg中配置的唯一序號一一對應

叢集對比單機版只是多了一些配置,其他的沒有任何變化。相對比還是非常簡單的

客戶端操作

Zookeeper提供了命令列的操作方式,通過 zkCli.sh 來啟動,並且操作方式和 Linux命令 基本相同,下面我們簡單演示一下

 # 本地環境可以不配置
 zkCli.sh [-server 127.0.0.1:2181]

下面通過一張圖來簡單介紹一些Zookeeper的增刪改查吧

這其實非常簡單的,而且我們並不用搞懂它,畢竟我們在操作的時候並不能直接連到伺服器上,下面我們來看看如何通過提供的API來對Zookeeper進行操作吧

鎖住它

知其然

在《分散式鎖原理》一文中我們曾經介紹過基於Zookeeper實現分散式鎖的思路,主要通過Zookeeper的臨時節點來實現:

  • 在主節點下每個客戶端過來都會註冊臨時有序節點
  • 每個節點只監聽自己前一個節點,如果發現自己是第一個節點,說明已經獲取到了鎖

而只要客戶端斷開session連線,臨時有序節點自動刪除,客戶端鎖就被釋放

知其所以然

下面我們就通過Zookeeper的API來實現一個分散式鎖吧。還是老樣子,一版自己寫,一版看看人家的實現方式。對比一下。

原生程式碼

 private static final CountDownLatch LATCH = new CountDownLatch(1);
 // 獲取ZooKeeper的操作
 public static ZooKeeper getZk() {
     ZooKeeper zooKeeper = null;
     try {
         zooKeeper = new ZooKeeper("192.168.10.200:2181/locks", 1000, event -> {
             switch (event.getState()) {
                 case SyncConnected:
                     // 等到回到 連結成功的事件,就能釋放阻塞
                     LATCH.countDown();
                     break;
             }
         });
         //Reactor程式設計模型,返回很快,但是記憶體中並沒有構建完成,所以需要等待
         LATCH.await();
     } catch (Exception e) {
         e.printStackTrace();
     }
     return zooKeeper;
 }

主要程式碼

public class LockWatchCallback implements Watcher, AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback {

    private ZooKeeper zk;
    private String name;
    private String nodePathName;

    private CountDownLatch latch = new CountDownLatch(1);

    public LockWatchCallback(ZooKeeper zk, String name) {
        this.zk = zk;
        this.name = name;
    }

    public void lock() {
        /**
         * 建立節點:
         *  path: 如果在192.168.10.200:2181/locks指定了目錄,那麼這裡的 根目錄 代表的是 /locks,然後在建立對應的臨時節點
         *  ZooDefs.Ids.OPEN_ACL_UNSAFE: 許可權:全部開放
         *  CreateMode.EPHEMERAL_SEQUENTIAL: 臨時有序節點
         *  StringCallback: 節點建立完成之後的回撥
         */
        zk.create("/lock", name.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, name);
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void unLock() {
        try {
            zk.delete(nodePathName, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            // 當節點刪除之後,重新拉取一次全部子節點,然後進行監聽處理
            case NodeDeleted:
                zk.getChildren("/", false, this, "abc");
                break;
        }

    }

    // zk.create("/lock", name.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, name); 回撥
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if (null != name) {
            nodePathName = name;
            // 得到根節點下建立的節點,我們不需要watch根目錄
            zk.getChildren("/", false, this, "abc");
        }
    }

    // zk.getChildren("/", false, this, "abc"); 回撥
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        // 得到的children是無序的,所以要先做一個排序
        Collections.sort(children);

        // /lock0000000000, 而children中是沒有斜線的,所以要擷取一下
        int i = 1;
        if ((i = children.indexOf(nodePathName.substring(1))) < 1) {
            // 自己已經是第一個節點了,獲取到了鎖,開始執行
            try {
                zk.setData("/", this.name.getBytes(StandardCharsets.UTF_8), 1);
            } catch (Exception e) {
                e.printStackTrace();
            }
            // 釋放掉阻塞,讓執行
            latch.countDown();
        } else {
            // 監控自己的前一個節點是否還存在
            try {
                zk.exists("/" + children.get(i - 1), this, this, "abc");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
    }
}

全程採用Zookeeper提供的非同步API方式進行回撥處理,在每一步回撥的地方都添加了註釋,看起來是比較方便的。

畢竟是個不成熟的小案例,缺少了分散式鎖的很多特性,比如:鎖重入等等

『不用太刻意對上面的程式碼做研究,在生產環境下是不會使用這樣的程式碼的』

下面我看一下如何測試:為了能和之前的程式進行統一,做了一個小小的封裝,也可以直接使用 LockWatchCallback 物件來處理鎖操作

public class ZookeeperLock extends AbstractLock {

    ZooKeeper zk;
    LockWatchCallback watchCallback;
    public ZookeeperLock(ZooKeeper zk) {
        this.zk = zk;
    }

    @Override
    public void start() {
        // 每個執行緒都需要建立一個臨時有序節點,所以每個執行緒都會new一個watchCallback物件
        watchCallback = new LockWatchCallback(zk, Thread.currentThread().getName());
    }

    @Override
    public void lock() {
        // 加鎖,建立節點
        this.watchCallback.lock();
    }

    @Override
    public void unlock() {
        // 解鎖,刪除節點
        this.watchCallback.unLock();
    }

    @Override
    public void destory() {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException {
        int[] count = {0};
        final ZookeeperLock zkLock = new ZookeeperLock(getZk());
        for (int i = 0; i < 100; i++) {
            executorService.submit(() -> {
                zkLock.start();

                zkLock.lock();
                count[0]++;
                zkLock.unlock();
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.HOURS);
        System.out.println(count[0]);

        zkLock.destory();
    }
}

成熟框架

那接下來我們就聊一聊成熟的框架是怎麼實現分散式鎖的: Curator

  • 實現方式是不變的,不過在我們上一版的基礎豐富了更多的鎖特性,並且實現更加穩定,呼叫更加方便
public class ZkLock extends AbstractLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(ZkLock.class);

    /**
     * Zookeeper地址 ip:port
     */
    private final String zkAddr;
    /**
     * 總路徑
     */
    private final String lockPath;
    private CuratorFramework client;
    private InterProcessLock lock;

    public ZkLock(String zkAddr, String lockPath) {
        this.zkAddr = zkAddr;
        this.lockPath = lockPath;
    }

    @Override
    public void lock() {
        try {
            this.lock.acquire();
        } catch (Exception e) {
            LOGGER.error("Lock異常,異常資訊:{}", e.getMessage());
        }
    }

    @Override
    public boolean tryLock() {
        boolean isLocked = false;
        try {
            isLocked = this.lock.acquire(0, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.error("tryLock異常,異常資訊:{}", e.getMessage());
        }
        return isLocked;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        boolean isLocked = false;
        try {
            isLocked = this.lock.acquire(time, unit);
        } catch (Exception e) {
            LOGGER.error("tryLock異常,異常資訊:{}", e.getMessage());
        }
        return isLocked;
    }

    @Override
    public void unlock() {
        try {
            this.lock.release();
        } catch (Exception e) {
            LOGGER.error("unlock異常,異常資訊:{}", e.getMessage());
        }
    }

    @Override
    public void start() {
        client = CuratorFrameworkFactory.newClient(
                this.zkAddr,
                new RetryNTimes(5, 5000)
        );
        client.start();
        if (client.getState() == CuratorFrameworkState.STARTED) {
            LOGGER.info("zk client start successfully!");
            LOGGER.info("zkAddress:{},lockPath:{}", this.zkAddr, lockPath);
        } else {
            throw new RuntimeException("客戶端啟動失敗。。。");
        }
        this.lock = defaultLock(lockPath);
    }

    /**
     * 公平可重入鎖
     *
     * @param lockPath 路徑
     * @return InterProcessMutex
     */
    InterProcessLock defaultLock(String lockPath) {
        return new InterProcessMutex(client, lockPath);
    }
}

看看這個程式碼量是不是簡潔了很多,雖然簡潔,但是功能俱全。我們來驗證一下:

private static ExecutorService executorService = Executors.newCachedThreadPool();

public static void main(String[] args) throws InterruptedException {
    ZkLock zkLock = new ZkLock("192.168.10.200:2181","/locks");
    zkLock.start();

    int[] num = {0};
    long start = System.currentTimeMillis();
    for(int i=0;i<200;i++){
        executorService.submit(()->{
            try {
                zkLock.lock();
                num[0]++;
            } catch (Exception e){
                throw new RuntimeException(e);
            } finally {
                zkLock.unlock();
            }
        });

    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.HOURS);
    System.out.println(num[0]);

}

完全OK!!!

最後

關於Zookeeper分散式鎖的實現我們就介紹到這裡。Zookeeper在實際使用中的場景還是非常豐富的,包括分散式協調等功能都在等著大家一一探索。

而關於分散式鎖還有最後一個章節就結束了,接下來我們就來了解一下關於後起之秀Etcd的相關操作和Etcd是如何實現分散式鎖的。