Curator框架實現ZooKeeper分佈式鎖

 排他鎖(X)

這裏主要講講分佈式鎖中的排他鎖。排他鎖(Exclusive Locks,簡稱X鎖),又稱爲寫鎖或獨佔鎖,是一種基本的鎖類型。若是事務T1對數據對象O1加上了排他鎖,那麼在整個加鎖期間,只容許T1對O1進行數據的讀取和更新操做,其它任何事務都不能對O1進行任何類型的操做,直道T1釋放了排他鎖。java

定義鎖

在ZooKeeper中,能夠經過在ZooKeeper中建立一個數據節點來表示一個鎖。好比,/exclusive_lock/lock節點(znode)就能夠表示爲一個鎖。node

獲取鎖

在須要獲取排他鎖時,全部的客戶端都會試圖經過create()接口,在/exclusive_lock節點下建立臨時的子節點/exclusive_lock/lock,但ZooKeeper的強一致性最終只會保證僅有一個客戶單能建立成功,那麼就認爲該客戶端獲取了鎖。同時,全部沒有獲取鎖的客戶端事務只能處於等待狀態,這些處於等待狀態的客戶端事先能夠在/exclusive_lock節點上註冊一個子節點變動的Watcher監聽,以便實時監聽到子節點的變動狀況。apache

釋放鎖

 在「定義鎖」部分,咱們已經提到/exclusive_lock/lock是一個臨時節點,所以在如下兩種狀況下可能釋放鎖。服務器

  • 當前獲取鎖的客戶端發生宕機,那麼ZooKeeper服務器上保存的臨時性節點就會被刪除;
  • 正常執行完業務邏輯後,由客戶端主動來將本身建立的臨時節點刪除。

不管什麼狀況下,臨時節點/exclusive_lock/lock被移除,ZooKeeper都會通知在/exclusive_lock註冊了子節點變動Watcher監聽的客戶端。這些客戶端在接收到通知之後就會再次發起獲取鎖的操做,即重複「獲取鎖」過程。排他鎖流程以下:session

下面的代碼(java)演示了使用Curator框架來實現ZooKeeper分佈式鎖框架

import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;


public class ZkLock {

  @SneakyThrows
  public static void main(String[] args) {

    final String connectString = "localhost:2181,localhost:2182,localhost:2183";

    // 重試策略,初始化每次重試之間須要等待的時間,基準等待時間爲1秒。
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

    // 使用默認的會話時間(60秒)和鏈接超時時間(15秒)來建立 Zookeeper 客戶端
    @Cleanup CuratorFramework client = CuratorFrameworkFactory.builder().
        connectString(connectString).
        connectionTimeoutMs(15 * 1000).
        sessionTimeoutMs(60 * 100).
        retryPolicy(retryPolicy).
        build();

    // 啓動客戶端
    client.start();

    final String lockNode = "/lock_node";
    InterProcessMutex lock = new InterProcessMutex(client, lockNode);
    try {
      // 1. Acquire the mutex - blocking until it's available.
      lock.acquire();

      // OR

      // 2. Acquire the mutex - blocks until it's available or the given time expires.
      if (lock.acquire(60, TimeUnit.MINUTES)) {
        Stat stat = client.checkExists().forPath(lockNode);
        if (null != stat){
          // Dot the transaction
        }
      }
    } finally {
      if (lock.isAcquiredInThisProcess()) {
        lock.release();
      }
    }
  }

}

 maven引用maven

<!--curator這個開源項目提供zookeeper分佈式鎖實現-->
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>2.8.0</version>
</dependency>
相關文章
相關標籤/搜索