「從入門到放棄-ZooKeeper」ZooKeeper實戰-分佈式鎖

前言

上文【從入門到放棄-ZooKeeper】ZooKeeper實戰-分佈式隊列中,咱們一塊兒寫了下如何經過ZooKeeper的持久性順序節點實現一個分佈式隊列。java

本文咱們來一塊兒寫一個ZooKeeper的實現的分佈式鎖。node

設計

參考以前學習的【從入門到放棄-Java】併發編程-JUC-locks-ReentrantLock,實現java.util.concurrent.locks.Lock接口。編程

咱們經過重寫接口中的方法實現一個可重入鎖。併發

  • lock:請求鎖,若是成功則直接返回,不成功則阻塞 直到獲取鎖。
  • lockInterruptibly:請求鎖,若是失敗則一直阻塞等待 直到獲取鎖或線程中斷
  • tryLock:一、嘗試獲取鎖,獲取失敗的話 直接返回false,不會再等待。二、嘗試獲取鎖,獲取成功返回true,不然一直請求,直到超時返回false
  • unlock:釋放鎖

咱們使用ZooKeeper的EPHEMERAL臨時節點機制,若是能建立成功的話,則獲取鎖成功,釋放鎖或客戶端斷開鏈接後,臨時節點自動刪除,這樣能夠避免誤刪除或漏刪除的狀況。分佈式

獲取鎖失敗後,這裏咱們使用輪詢的方式來不斷嘗試建立。其實應該使用Watcher機制來實現,這樣能避免大量的無用請求。在下一節更優雅的分佈式鎖實現機制中咱們會用到。學習

DistributedLock

public class DistributedLock implements Lock { private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class); //ZooKeeper客戶端,進行ZooKeeper操做 private ZooKeeper zooKeeper; //根節點名稱 private String dir; //加鎖節點 private String node; //ZooKeeper鑑權信息 private List<ACL> acls; //要加鎖節點 private String fullPath; //加鎖標識,爲0時表示未獲取到鎖,每獲取一次鎖則加一,釋放鎖時減一。減到0時斷開鏈接,刪除臨時節點。 private volatile int state; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public DistributedLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; this.fullPath = dir.concat("/").concat(node); init(); } private void init() { try { Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[DistributedLock#init] error : " + e.toString(), e); } } }
 

lock

public void lock() { //經過state實現重入機制,若是已經獲取鎖,則將state++便可。 if (addLockCount()) { return; } //一直嘗試獲取鎖,知道獲取成功 for (;;) { try { //建立臨時節點 zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); //第一次獲取鎖,state++,這裏不須要使用加鎖機制保證原子性,由於同一時間,最多隻有一個線程能create節點成功。 state++; break; } catch (InterruptedException ie) { //若是捕獲中斷異常,則設置當前線程爲中斷狀態 logger.error("[DistributedLock#lock] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //若是捕獲到的異常是 節點已存在 外的其餘異常,則設置當前線程爲中斷狀態 logger.error("[DistributedLock#lock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } }
 

lockInterruptibly

public void lockInterruptibly() throws InterruptedException { //經過state實現重入機制,若是已經獲取鎖,則將state++便可。 if (addLockCount()) { return; } for (;;) { //若是當前線程爲中斷狀態,則拋出中斷異常 if (Thread.interrupted()) { throw new InterruptedException(); } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; break; } catch (InterruptedException ie) { //若是捕獲中斷異常,則設置當前線程爲中斷狀態 logger.error("[DistributedLock#lockInterruptibly] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //若是捕獲到的異常是 節點已存在 外的其餘異常,則設置當前線程爲中斷狀態 logger.error("[DistributedLock#lockInterruptibly] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } }
 

tryLock

public boolean tryLock() { //經過state實現重入機制,若是已經獲取鎖,則將state++便可。 if (addLockCount()) { return true; } //若是獲取成功則返回true,失敗則返回false try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (Exception e) { logger.error("[DistributedLock#tryLock] error : " + e.toString(), e); } return false; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //經過state實現重入機制,若是已經獲取鎖,則將state++便可。 if (addLockCount()) { return true; } //若是嘗試獲取超時,則返回false long nanosTimeout = unit.toNanos(time); if (nanosTimeout <= 0L) { return false; } final long deadline = System.nanoTime() + nanosTimeout; for (;;) { //若是當前線程爲中斷狀態,則拋出中斷異常 if (Thread.interrupted()) { throw new InterruptedException(); } //若是嘗試獲取超時,則返回false nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { return false; } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (InterruptedException ie) { //若是捕獲中斷異常,則返回false logger.error("[DistributedLock#tryLock] error : " + ie.toString(), ie); return false; } catch (KeeperException ke) { //若是捕獲到的異常是 節點已存在 外的其餘異常,則返回false logger.error("[DistributedLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } } }
 

unlock

public void unlock() { //經過state實現重入機制,若是已經獲取鎖,釋放鎖時,須要將state--。 delLockCount(); //若是state爲0時,說明再也不持有鎖,須要將鏈接關閉,自動刪除臨時節點 if (state == 0 && zooKeeper != null) { try { zooKeeper.close(); } catch (InterruptedException e) { logger.error("[DistributedLock#unlock] error : " + e.toString(), e); } } }
 

addLockCount

private boolean addLockCount() { //若是state大於0,即已持有鎖,將state數量加一 if (state > 0) { synchronized (this) { if (state > 0) { state++; return true; } } } return false; }
 

delLockCount

private boolean delLockCount() { //若是state大於0,即還持有鎖,將state數量減一 if (state > 0) { synchronized (this) { if (state > 0) { state--; return true; } } } return false; }
 

總結

上面就是一個經過ZooKeeper實現的分佈式可重入鎖,利用了臨時節點的特性。源代碼可見:aloofJr優化

其中有幾個能夠優化的點。this

  • 輪詢的方式換成Watcher機制
  • 可重入鎖實現方式的優化
  • 全部線程競爭一個節點的建立,容易出現羊羣效應,且是一種不公平的鎖競爭模式

下節咱們使用新的方式實現分佈式鎖來解決上面的幾個問題,若是你們好的優化建議,歡迎一塊兒討論。spa

更多文章線程

見個人博客:https://nc2era.com

written by AloofJr,轉載請註明出處

 

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索