前言:html
因爲在平時的工做中,線上服務器是分佈式多臺部署的,常常會面臨解決分佈式場景下數據一致性的問題,那麼就要利用分佈式鎖來解決這些問題。以本身結合實際工做中的一些經驗和網上看到的一些資料,作一個講解和總結。以前我已經寫了一篇關於分佈式鎖的文章: 分佈式鎖1 Java經常使用技術方案 。上一篇文章中主要寫的是在平常項目中,較爲常見的幾種實現分佈式鎖的方法。經過這些方法,基本上能夠解決咱們平常工做中大部分場景下使用分佈式鎖的問題。java
本篇文章主要是在上一篇文章的基礎上,介紹一些雖然平常工做中不經常使用或者比較實現起來比較重,可是能夠做爲技術方案學習瞭解一下的分佈式鎖方案。但願這篇文章能夠方便本身之後查閱,同時要是能幫助到他人那也是很好的。redis
===============================================================長長的分割線====================================================================apache
正文:服務器
第一步,使用zookeeper節點名稱惟一性,用於分佈式鎖:session
關於zookeeper集羣的搭建,能夠參考我以前寫的一篇文章: ZooKeeper1 利用虛擬機搭建本身的ZooKeeper集羣多線程
zookeeper抽象出來的節點結構是一個和文件系統相似的小型的樹狀的目錄結構,同時zookeeper機制規定:同一個目錄下只能有一個惟一的文件名。例如:咱們在zookeeper的根目錄下,由兩個客戶端同時建立一個名爲/myDistributeLock,只有一個客戶端能夠成功。併發
上述方案和memcached的add()方法、redis的setnx()方法實現分佈式鎖有着相同的思路。這樣的方案實現起來若是不考慮搭建和維護zookeeper集羣的成本,因爲正確性和可靠性是zookeeper機制本身保證的,實現仍是比較簡單的。分佈式
第二步,使用zookeeper臨時順序節點,用於分佈式鎖:ide
在討論這套方案以前,咱們有必要先「吹毛求疵」般的說明一下使用zookeeper節點名稱惟一性來作分佈式鎖這個方案的缺點。好比,當許多線程在等待一個鎖時,若是鎖獲得釋放的時候,那麼全部客戶端都被喚醒,可是僅僅有一個客戶端獲得鎖。在這個過程當中,大量的線程根本沒有得到鎖的可能性,可是也會引發大量的上下文切換,這個系統開銷也是不小的,對於這樣的現象有一個專業名詞,稱之爲「驚羣效應」。
咱們首先說明一下zookeeper的順序節點、臨時節點和watcher機制:
所謂順序節點,假如咱們在/myDisLocks/目錄下建立3個節點,zookeeper集羣會按照發起建立的順序來建立節點,節點分別爲/myDisLocks/000000000一、/myDisLocks/000000000二、/myDisLocks/0000000003。
所謂臨時節點,臨時節點由某個客戶端建立,當客戶端與zookeeper集羣斷開鏈接,則該節點自動被刪除。
所謂對於watcher機制,你們能夠參考Apache ZooKeeper Watcher機制源碼解釋。固然若是你以前不知道watcher機制是個什麼東東,不建議你直接去看前邊我提供的文章連接,這樣你極有可能忘掉咱們的討論主線,即分佈式鎖的實現方案,而陷入到watcher機制的源碼實現中。因此你也能夠先看看下面的具體方案,猜想一下watcher是用來幹嗎的,我這裏先總結一句話作個引子: 所謂watcher機制,你能夠簡單一點兒理解成任何一個鏈接zookeeper的客戶端能夠經過watcher機制關注本身感興趣的節點的增刪改查,當這個節點發生增刪改查的操做時,會「廣播」本身的消息,全部對此感興趣的節點能夠在收到這些消息後,根據本身的業務須要執行後續的操做。
具體的使用步驟以下:
1. 每一個業務線程調用create()方法建立名爲「/myDisLocks/thread」的節點,須要注意的是,這裏節點的建立類型須要設置爲EPHEMERAL_SEQUENTIAL,即節點類型爲臨時順序節點。此時/myDisLocks節點下會出現諸如/myDisLocks/thread000000000一、/myDisLocks/thread000000000二、/myDisLocks/thread0000000003這樣的子節點。
2. 每一個業務線程調用getChildren(「myDisLocks」)方法來獲取/myDisLocks這個節點下全部已經建立的子節點。
3. 每一個業務線程獲取到全部子節點的路徑以後,若是發現本身在步驟1中建立的節點的尾綴編號是全部節點中序號最小的,那麼就認爲本身得到了鎖。
4. 若是在步驟3中發現本身並不是是全部子節點中序號最小的,說明本身尚未獲取到鎖。使用watcher機制監視比本身建立節點的序列號小的節點(比本身建立的節點小的最大節點),進入等待。好比,若是當前業務線程建立的節點是/myDisLocks/thread0000000003,那麼在沒有獲取到鎖的狀況下,他只須要監視/myDisLocks/thread0000000002的狀況。只有當/myDisLocks/thread0000000002獲取到鎖並釋放以後,當前業務線程才啓動獲取鎖,這樣能夠避免一個業務線程釋放鎖以後,其餘全部線程都去競爭鎖,引發沒必要要的上下文切換,最終形成「驚羣現象」。
5. 釋放鎖的過程相對比較簡單,就是刪除本身建立的那個子節點便可。
注意: 這個方案實現的分佈式鎖還帶着一點兒公平鎖的味道!爲何呢?咱們在利用每一個節點的序號進行排隊以此來避免進羣現象時,實際上全部業務線程得到鎖的順序就是本身建立節點的順序,也就是哪一個業務線程先來,哪一個就能夠最快得到鎖。
下面貼出我本身實現的上述方案的代碼:
1. 代碼中有兩個Java類: MyDistributedLockByZK.java和LockWatcher.java。其中MyDistributedLockByZK.java中的main函數利用線程池啓動5個線程,以此來模擬多個業務線程競爭鎖的狀況;而LockWatcher.java定義分佈式鎖和實現了watcher機制。
2. 同時,我使用的zookeeper集羣是本身之前利用VMWare搭建的集羣,因此zookeeper連接是192.168.224.170:2181,你們能夠根據替換成本身的zookeeper連接便可。
1 public class MyDistributedLockByZK { 2 /** 線程池 **/ 3 private static ExecutorService executorService = null; 4 private static final int THREAD_NUM = 5; 5 private static int threadNo = 0; 6 private static CountDownLatch threadCompleteLatch = new CountDownLatch(THREAD_NUM); 7 8 /** ZK的相關配置常量 **/ 9 private static final String CONNECTION_STRING = "192.168.224.170:2181"; 10 private static final int SESSION_TIMEOUT = 10000; 11 // 此變量在LockWatcher中也有一個同名的靜態變量,正式使用的時候,提取到常量類中共同維護便可。 12 private static final String LOCK_ROOT_PATH = "/myDisLocks"; 13 14 public static void main(String[] args) { 15 // 定義線程池 16 executorService = Executors.newFixedThreadPool(THREAD_NUM, new ThreadFactory() { 17 @Override 18 public Thread newThread(Runnable r) { 19 String name = String.format("第[%s]個測試線程", ++threadNo); 20 Thread ret = new Thread(Thread.currentThread().getThreadGroup(), r, name, 0); 21 ret.setDaemon(false); 22 return ret; 23 } 24 }); 25 26 // 啓動線程 27 if (executorService != null) { 28 startProcess(); 29 } 30 } 31 32 /** 33 * @author zhangyi03 34 * @date 2017-5-23 下午5:57:27 35 * @description 模擬併發執行任務 36 */ 37 public static void startProcess() { 38 Runnable disposeBusinessRunnable= new Thread(new Runnable() { 39 public void run() { 40 String threadName = Thread.currentThread().getName(); 41 42 LockWatcher lock = new LockWatcher(threadCompleteLatch); 43 try { 44 /** 步驟1: 當前線程建立ZK鏈接 **/ 45 lock.createConnection(CONNECTION_STRING, SESSION_TIMEOUT); 46 47 /** 步驟2: 建立鎖的根節點 **/ 48 // 注意,此處建立根節點的方式其實徹底能夠在初始化的時候由主線程單獨進行根節點的建立,沒有必要在業務線程中建立。 49 // 這裏這樣寫只是一種思路而已,沒必要侷限於此 50 synchronized (MyDistributedLockByZK.class){ 51 lock.createPersistentPath(LOCK_ROOT_PATH, "該節點由" + threadName + "建立", true); 52 } 53 54 /** 步驟3: 開啓鎖競爭並執行任務 **/ 55 lock.getLock(); 56 } catch (Exception e) { 57 e.printStackTrace(); 58 } 59 } 60 }); 61 62 for (int i = 0; i < THREAD_NUM; i++) { 63 executorService.execute(disposeBusinessRunnable); 64 } 65 executorService.shutdown(); 66 67 try { 68 threadCompleteLatch.await(); 69 System.out.println("全部線程運行結束!"); 70 } catch (InterruptedException e) { 71 e.printStackTrace(); 72 } 73 } 74 }
1 public class LockWatcher implements Watcher { 2 /** 成員變量 **/ 3 private ZooKeeper zk = null; 4 // 當前業務線程競爭鎖的時候建立的節點路徑 5 private String selfPath = null; 6 // 當前業務線程競爭鎖的時候建立節點的前置節點路徑 7 private String waitPath = null; 8 // 確保鏈接zk成功;只有當收到Watcher的監聽事件以後,才執行後續的操做,不然請求阻塞在createConnection()建立ZK鏈接的方法中 9 private CountDownLatch connectSuccessLatch = new CountDownLatch(1); 10 // 標識線程是否執行完任務 11 private CountDownLatch threadCompleteLatch = null; 12 13 /** ZK的相關配置常量 **/ 14 private static final String LOCK_ROOT_PATH = "/myDisLocks"; 15 private static final String LOCK_SUB_PATH = LOCK_ROOT_PATH + "/thread"; 16 17 public LockWatcher(CountDownLatch latch) { 18 this.threadCompleteLatch = latch; 19 } 20 21 @Override 22 public void process(WatchedEvent event) { 23 if (event == null) { 24 return; 25 } 26 27 // 通知狀態 28 Event.KeeperState keeperState = event.getState(); 29 // 事件類型 30 Event.EventType eventType = event.getType(); 31 32 // 根據通知狀態分別處理 33 if (Event.KeeperState.SyncConnected == keeperState) { 34 if ( Event.EventType.None == eventType ) { 35 System.out.println(Thread.currentThread().getName() + "成功鏈接上ZK服務器"); 36 // 此處代碼的主要做用是用來輔助判斷當前線程確實已經鏈接上ZK 37 connectSuccessLatch.countDown(); 38 }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { 39 System.out.println(Thread.currentThread().getName() + "收到情報,排我前面的傢伙已掛,我準備再次確認我是否是最小的節點!?"); 40 try { 41 if(checkMinPath()){ 42 getLockSuccess(); 43 } 44 } catch (Exception e) { 45 e.printStackTrace(); 46 } 47 } 48 } else if ( Event.KeeperState.Disconnected == keeperState ) { 49 System.out.println(Thread.currentThread().getName() + "與ZK服務器斷開鏈接"); 50 } else if ( Event.KeeperState.AuthFailed == keeperState ) { 51 System.out.println(Thread.currentThread().getName() + "權限檢查失敗"); 52 } else if ( Event.KeeperState.Expired == keeperState ) { 53 System.out.println(Thread.currentThread().getName() + "會話失效"); 54 } 55 } 56 57 /** 58 * @author zhangyi03 59 * @date 2017-5-23 下午6:07:03 60 * @description 建立ZK鏈接 61 * @param connectString ZK服務器地址列表 62 * @param sessionTimeout Session超時時間 63 * @throws IOException 64 * @throws InterruptedException 65 */ 66 public void createConnection(String connectString, int sessionTimeout) throws IOException, InterruptedException { 67 zk = new ZooKeeper(connectString, sessionTimeout, this); 68 // connectSuccessLatch.await(1, TimeUnit.SECONDS) 正式實現的時候能夠考慮此處是否採用超時阻塞 69 connectSuccessLatch.await(); 70 } 71 72 /** 73 * @author zhangyi03 74 * @date 2017-5-23 下午6:15:48 75 * @description 建立ZK節點 76 * @param path 節點path 77 * @param data 初始數據內容 78 * @param needWatch 79 * @return 80 * @throws KeeperException 81 * @throws InterruptedException 82 */ 83 public boolean createPersistentPath(String path, String data, boolean needWatch) throws KeeperException, InterruptedException { 84 if(zk.exists(path, needWatch) == null){ 85 String result = zk.create( path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 86 System.out.println(Thread.currentThread().getName() + "建立節點成功, path: " + result + ", content: " + data); 87 } 88 return true; 89 } 90 91 /** 92 * @author zhangyi03 93 * @date 2017-5-23 下午6:24:46 94 * @description 獲取分佈式鎖 95 * @throws KeeperException 96 * @throws InterruptedException 97 */ 98 public void getLock() throws Exception { 99 selfPath = zk.create(LOCK_SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 100 System.out.println(Thread.currentThread().getName() + "建立鎖路徑:" + selfPath); 101 if(checkMinPath()){ 102 getLockSuccess(); 103 } 104 } 105 106 /** 107 * @author zhangyi03 108 * @date 2017-5-23 下午7:02:41 109 * @description 獲取鎖成功 110 * @throws KeeperException 111 * @throws InterruptedException 112 */ 113 private void getLockSuccess() throws KeeperException, InterruptedException { 114 if(zk.exists(selfPath, false) == null){ 115 System.err.println(Thread.currentThread().getName() + "本節點已不在了..."); 116 return; 117 } 118 System.out.println(Thread.currentThread().getName() + "獲取鎖成功,開始處理業務數據!"); 119 Thread.sleep(2000); 120 System.out.println(Thread.currentThread().getName() + "處理業務數據完成,刪除本節點:" + selfPath); 121 zk.delete(selfPath, -1); 122 releaseConnection(); 123 threadCompleteLatch.countDown(); 124 } 125 126 /** 127 * @author zhangyi03 128 * @date 2017-5-23 下午7:06:46 129 * @description 關閉ZK鏈接 130 */ 131 private void releaseConnection() { 132 if (zk != null) { 133 try { 134 zk.close(); 135 } catch (InterruptedException e) { 136 e.printStackTrace(); 137 } 138 } 139 System.out.println(Thread.currentThread().getName() + "釋放ZK鏈接"); 140 } 141 142 /** 143 * @author zhangyi03 144 * @date 2017-5-23 下午6:57:14 145 * @description 檢查本身是否是最小的節點 146 * @param selfPath 147 * @return 148 * @throws KeeperException 149 * @throws InterruptedException 150 */ 151 private boolean checkMinPath() throws Exception { 152 List<String> subNodes = zk.getChildren(LOCK_ROOT_PATH, false); 153 // 根據元素按字典序升序排序 154 Collections.sort(subNodes); 155 System.err.println(Thread.currentThread().getName() + "建立的臨時節點名稱:" + selfPath.substring(LOCK_ROOT_PATH.length()+1)); 156 int index = subNodes.indexOf(selfPath.substring(LOCK_ROOT_PATH.length()+1)); 157 System.err.println(Thread.currentThread().getName() + "建立的臨時節點的index:" + index); 158 switch (index){ 159 case -1: { 160 System.err.println(Thread.currentThread().getName() + "建立的節點已不在了..." + selfPath); 161 return false; 162 } 163 case 0:{ 164 System.out.println(Thread.currentThread().getName() + "子節點中,我果真是老大" + selfPath); 165 return true; 166 } 167 default:{ 168 // 獲取比當前節點小的前置節點,此處只關注前置節點是否還在存在,避免驚羣現象產生 169 waitPath = LOCK_ROOT_PATH +"/"+ subNodes.get(index - 1); 170 System.out.println(Thread.currentThread().getName() + "獲取子節點中,排在我前面的節點是:" + waitPath); 171 try { 172 zk.getData(waitPath, true, new Stat()); 173 return false; 174 } catch (Exception e) { 175 if (zk.exists(waitPath, false) == null) { 176 System.out.println(Thread.currentThread().getName() + "子節點中,排在我前面的" + waitPath + "已失蹤,該我了"); 177 return checkMinPath(); 178 } else { 179 throw e; 180 } 181 } 182 } 183 184 } 185 } 186 }
第三步,使用memcached的cas()方法,用於分佈式鎖:
下篇文章咱們再細說!
第四步,使用redis的watch、multi、exec命令,用於分佈式鎖:
下篇文章咱們再細說!
第五步,總結:
綜上,對於分佈式鎖這些很是用或者實現起來比較重的方案,你們能夠根據本身在項目中的須要,酌情使用。最近在和別人討論的過程當中,以及個人第一篇關於分佈式鎖的文章分佈式鎖1 Java經常使用技術方案 你們的回覆中,總結來看,對於用redis實現分佈式鎖確實存在着比較多的細節問題能夠進行深刻討論,歡迎你們留言,相互學習。
忍不住嘚瑟一下,我媳婦兒此刻在我旁邊看AbstractQueuedSynchronizer,厲害吧?!,一下子出去吃飯,哈哈~
第六步,線上使用補充篇:
截止到2017.08.25(週五),使用上述文章中的」臨時節點+watcher機制方案」解決一個分佈式鎖的問題時,最終發如今實現過程當中,因爲watcher機制相似於通知等待機制的特色,若是主線程在經歷「獲取鎖操做」、「處理業務代碼」、「釋放鎖操做」這三步的過程當中,使用watcher機制阻塞的獲取鎖時,會致使根本沒法將獲取鎖結果返回給主線程,而在實際的時候過程當中,通常狀況下主線程在「獲取鎖操做」時都但願能夠同步得到一個返回值。
因此,上述的」臨時節點+watcher機制方案」從技術方案角度足夠完美,可是在實際使用過程當中,我的以爲還不是特別的方便。