目前幾乎不少大型網站及應用都是分佈式部署的,分佈式場景中的數據一致性問題一直是一個比較重要的話題。分佈式的CAP理論告訴咱們「任何一個分佈式系統都沒法同時知足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多隻能同時知足兩項。」因此,不少系統在設計之初就要對這三者作出取捨。在互聯網領域的絕大多數的場景中,都須要犧牲強一致性來換取系統的高可用性,系統每每只須要保證「最終一致性」,只要這個最終時間是在用戶能夠接受的範圍內便可。java
在不少場景中,咱們爲了保證數據的最終一致性,須要不少的技術方案來支持,好比分佈式事務、分佈式鎖等。有的時候,咱們須要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,Java中其實提供了不少併發處理相關的API,可是這些API在分佈式場景中就無能爲力了。也就是說單純的Java Api並不能提供分佈式鎖的能力。因此針對分佈式鎖的實現目前有多種方案。redis
針對分佈式鎖的實現,目前比較經常使用的有如下幾種方案:數據庫
基於數據庫實現分佈式鎖 基於緩存(redis,memcached,tair)實現分佈式鎖 基於Zookeeper實現分佈式鎖apache
1、zookeeper中分佈式鎖實現原理緩存
(1)、普通節點思路併發
如今模擬一個使用Zookeeper實現分佈式鎖,假設有A,B,C三臺客戶端去訪問資源,調用zookeeper得到鎖。客戶端三個在zookeeper的 /locks節點下建立一個/lock節點,因爲節點是惟一性的特性,只有一我的會建立成功,其他兩個建立失敗,會進入監聽/locks節點的變化,若是/locks下子節點/lock節點發生變化,其他兩個能夠去拿鎖,這樣是否好呢? 這樣子會致使驚羣效應。就是一個觸發使得在短期呢會觸發大量的watcher事件,可是隻有一個客戶端能拿到鎖less
--衆人搶,大量watcher事件分佈式
(2)、有序節點思路ide
一、在獲取分佈式鎖的時候在locker節點下建立臨時順序節點,釋放鎖的時候刪除該臨時節點。memcached
二、客戶端調用createNode方法在locks下建立臨時順序節點,而後調用getChildren(「locks」)來獲取locks下面的全部子節點,注意此時不用設置任何Watcher。
三、客戶端獲取到全部的子節點path以後,若是發現本身建立的子節點序號最小,那麼就認爲該客戶端獲取到了鎖。
四、若是發現本身建立的節點並不是locks全部子節點中最小的,說明本身尚未獲取到鎖,此時客戶端須要找到比本身小的那個節點,而後對其調用exist()方法,同時對其註冊事件監聽器。
五、以後,讓這個被關注的節點刪除,則客戶端的Watcher會收到相應通知,此時再次判斷本身建立的節點是不是locks子節點中序號最小的,若是是則獲取到了鎖,若是不是則重複以上步驟繼續獲取到比本身小的一個節點並註冊監聽。
2、代碼實現
package com.lf.zookeeper.lock; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /* *實現分佈式鎖 */ public class DestributeLock implements Lock,Watcher{ private ZooKeeper zk = null; private String ROOT_LOCK ="/locks";//定義根節點 private String CURRENT_LOCK;//當前鎖 private String WAIT_LOCK ;//等待前一個對象釋放鎖 private CountDownLatch countDownLatch; public DestributeLock() { try { zk= new ZooKeeper("192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181", 4000, this); //判斷根節點是否存在 Stat stat = zk.exists(ROOT_LOCK, false); if(stat==null){ zk.create(ROOT_LOCK, "1".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { if(countDownLatch != null){ this.countDownLatch.countDown(); } } @Override public void lock() { if(tryLock()){ System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+",獲取鎖成功!"); return; } try { waitForLock(WAIT_LOCK);//若是沒有得到鎖,繼續等待 } catch (Exception e) { e.printStackTrace(); } } private boolean waitForLock(String prev) throws Exception, InterruptedException { Stat stat = zk.exists(prev, true); if(stat!=null){ System.out.println(Thread.currentThread().getName()+"->等待"+ROOT_LOCK+prev+"釋放鎖"); countDownLatch = new CountDownLatch(1); countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"->"+"得到鎖成功!"); } return true; } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } @Override public boolean tryLock() { // TODO Auto-generated method stub try { CURRENT_LOCK= zk.create(ROOT_LOCK+"/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+",嘗試競爭鎖!"); //獲取根節點下的全部子節點 List<String> childrens = zk.getChildren(ROOT_LOCK, false); SortedSet<String> sortedSet = new TreeSet();//定義一個有序的集合進行排序 for (String children : childrens) { sortedSet.add(ROOT_LOCK+"/"+children); } //獲取最小的子節點 String firstNode = sortedSet.first(); SortedSet<String> lessthanMe = sortedSet.headSet(CURRENT_LOCK); if(CURRENT_LOCK.equals(firstNode)){//當前節點和最小鎖比較,若是相同,則獲取鎖成功 return true; } if(!lessthanMe.isEmpty()){ WAIT_LOCK = lessthanMe.last();//獲取比當前節點更小的最後一個節點,設置給WAIT_LOCK } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } return false; } @Override public boolean tryLock(long arg0, TimeUnit arg1) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public void unlock() { System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"釋放鎖"); try { zk.delete(CURRENT_LOCK, -1); CURRENT_LOCK = null; zk.close(); } catch (Exception e) { // TODO: handle exception } } }
測試類
package com.lf.zookeeper.lock; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class LockDemo { public static void main(String[] args) throws IOException { CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 1; i <= 10; i++) { new Thread(()->{ try { countDownLatch.await(); DestributeLock destributeLock = new DestributeLock(); destributeLock.lock(); } catch (Exception e) { e.printStackTrace(); } },"Thread-"+i).start(); countDownLatch.countDown(); } System.in.read(); } }
運行結果
Thread-4->/locks/0000000072,嘗試競爭鎖! Thread-5->/locks/0000000073,嘗試競爭鎖! Thread-9->/locks/0000000074,嘗試競爭鎖! Thread-8->/locks/0000000075,嘗試競爭鎖! Thread-10->/locks/0000000076,嘗試競爭鎖! Thread-2->/locks/0000000077,嘗試競爭鎖! Thread-6->/locks/0000000078,嘗試競爭鎖! Thread-3->/locks/0000000079,嘗試競爭鎖! Thread-7->/locks/0000000080,嘗試競爭鎖! Thread-1->/locks/0000000071,獲取鎖成功! Thread-4->等待/locks/locks/0000000071釋放鎖 Thread-5->等待/locks/locks/0000000072釋放鎖 Thread-9->等待/locks/locks/0000000073釋放鎖 Thread-8->等待/locks/locks/0000000074釋放鎖 Thread-10->等待/locks/locks/0000000075釋放鎖 Thread-2->等待/locks/locks/0000000076釋放鎖 Thread-6->等待/locks/locks/0000000077釋放鎖 Thread-3->等待/locks/locks/0000000078釋放鎖 Thread-7->等待/locks/locks/0000000079釋放鎖
手動觸發watcher事件,釋放鎖,delete /locks/0000000071
出現 Thread-4->/locks/0000000072,獲取鎖成功!
3、基於curator的實現分佈式鎖
代碼
package com.lf.zookeeper.lock; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; public class CuratorLockDemo { public static void main(String[] args) { CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().build(); InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/locks");//關注節點 try { interProcessMutex.acquire();//獲取鎖 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }