上文【從入門到放棄-ZooKeeper】ZooKeeper實戰-分佈式隊列中,咱們一塊兒寫了下如何經過ZooKeeper的持久性順序節點實現一個分佈式隊列。java
本文咱們來一塊兒寫一個ZooKeeper的實現的分佈式鎖。node
參考以前學習的【從入門到放棄-Java】併發編程-JUC-locks-ReentrantLock,實現java.util.concurrent.locks.Lock接口。編程
咱們經過重寫接口中的方法實現一個可重入鎖。併發
咱們使用ZooKeeper的EPHEMERAL臨時節點機制,若是能建立成功的話,則獲取鎖成功,釋放鎖或客戶端斷開鏈接後,臨時節點自動刪除,這樣能夠避免誤刪除或漏刪除的狀況。分佈式
獲取鎖失敗後,這裏咱們使用輪詢的方式來不斷嘗試建立。其實應該使用Watcher機制來實現,這樣能避免大量的無用請求。在下一節更優雅的分佈式鎖實現機制中咱們會用到。學習
public class DistributedLock implements Lock { private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class); //ZooKeeper客戶端,進行ZooKeeper操做 private ZooKeeper zooKeeper; //根節點名稱 private String dir; //加鎖節點 private String node; //ZooKeeper鑑權信息 private List<ACL> acls; //要加鎖節點 private String fullPath; //加鎖標識,爲0時表示未獲取到鎖,每獲取一次鎖則加一,釋放鎖時減一。減到0時斷開鏈接,刪除臨時節點。 private volatile int state; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public DistributedLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; this.fullPath = dir.concat("/").concat(node); init(); } private void init() { try { Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[DistributedLock#init] error : " + e.toString(), e); } } }
public void lock() { //經過state實現重入機制,若是已經獲取鎖,則將state++便可。 if (addLockCount()) { return; } //一直嘗試獲取鎖,知道獲取成功 for (;;) { try { //建立臨時節點 zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); //第一次獲取鎖,state++,這裏不須要使用加鎖機制保證原子性,由於同一時間,最多隻有一個線程能create節點成功。 state++; break; } catch (InterruptedException ie) { //若是捕獲中斷異常,則設置當前線程爲中斷狀態 logger.error("[DistributedLock#lock] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //若是捕獲到的異常是 節點已存在 外的其餘異常,則設置當前線程爲中斷狀態 logger.error("[DistributedLock#lock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } }
public void lockInterruptibly() throws InterruptedException { //經過state實現重入機制,若是已經獲取鎖,則將state++便可。 if (addLockCount()) { return; } for (;;) { //若是當前線程爲中斷狀態,則拋出中斷異常 if (Thread.interrupted()) { throw new InterruptedException(); } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; break; } catch (InterruptedException ie) { //若是捕獲中斷異常,則設置當前線程爲中斷狀態 logger.error("[DistributedLock#lockInterruptibly] error : " + ie.toString(), ie); Thread.currentThread().interrupt(); } catch (KeeperException ke) { //若是捕獲到的異常是 節點已存在 外的其餘異常,則設置當前線程爲中斷狀態 logger.error("[DistributedLock#lockInterruptibly] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { Thread.currentThread().interrupt(); } } } }
public boolean tryLock() { //經過state實現重入機制,若是已經獲取鎖,則將state++便可。 if (addLockCount()) { return true; } //若是獲取成功則返回true,失敗則返回false try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (Exception e) { logger.error("[DistributedLock#tryLock] error : " + e.toString(), e); } return false; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { //經過state實現重入機制,若是已經獲取鎖,則將state++便可。 if (addLockCount()) { return true; } //若是嘗試獲取超時,則返回false long nanosTimeout = unit.toNanos(time); if (nanosTimeout <= 0L) { return false; } final long deadline = System.nanoTime() + nanosTimeout; for (;;) { //若是當前線程爲中斷狀態,則拋出中斷異常 if (Thread.interrupted()) { throw new InterruptedException(); } //若是嘗試獲取超時,則返回false nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { return false; } try { zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL); state++; return true; } catch (InterruptedException ie) { //若是捕獲中斷異常,則返回false logger.error("[DistributedLock#tryLock] error : " + ie.toString(), ie); return false; } catch (KeeperException ke) { //若是捕獲到的異常是 節點已存在 外的其餘異常,則返回false logger.error("[DistributedLock#tryLock] error : " + ke.toString(), ke); if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) { return false; } } } }
public void unlock() { //經過state實現重入機制,若是已經獲取鎖,釋放鎖時,須要將state--。 delLockCount(); //若是state爲0時,說明再也不持有鎖,須要將鏈接關閉,自動刪除臨時節點 if (state == 0 && zooKeeper != null) { try { zooKeeper.close(); } catch (InterruptedException e) { logger.error("[DistributedLock#unlock] error : " + e.toString(), e); } } }
private boolean addLockCount() { //若是state大於0,即已持有鎖,將state數量加一 if (state > 0) { synchronized (this) { if (state > 0) { state++; return true; } } } return false; }
private boolean delLockCount() { //若是state大於0,即還持有鎖,將state數量減一 if (state > 0) { synchronized (this) { if (state > 0) { state--; return true; } } } return false; }
上面就是一個經過ZooKeeper實現的分佈式可重入鎖,利用了臨時節點的特性。源代碼可見:aloofJr優化
其中有幾個能夠優化的點。this
下節咱們使用新的方式實現分佈式鎖來解決上面的幾個問題,若是你們好的優化建議,歡迎一塊兒討論。spa
更多文章線程
見個人博客:https://nc2era.com
written by AloofJr,轉載請註明出處
本文爲雲棲社區原創內容,未經容許不得轉載。