在分佈式系統中,對一個共享資源的互斥訪問操做,就能夠用分佈式鎖來解決,相似於單機的Lock,分爲排他鎖和讀寫鎖。node
特色:segmentfault
是否是和以前的zookeeper之master選舉很像。
因此zookeeper的如下特性,也是能夠知足分佈式鎖服務器
InterProcessMutex
的acquire
和release
方法,在zookeeper之master選舉提過。
DistributeLockdom
public class DistributeLock { static CuratorFramework client = CuratorConnect.getCuratorClient2(); private static final String path = "distributeLock"; public static void distributeLock() { InterProcessMutex mutex = new InterProcessMutex(client, path); CountDownLatch countDownLatch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { new Thread(new Runnable() { public void run() { try { countDownLatch.countDown(); countDownLatch.await(); mutex.acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "獲取到鎖"); TimeUnit.SECONDS.sleep(1); mutex.release(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } public static void main(String[] args) { DistributeLock.distributeLock(); } }
運行結果以下(部分):
能夠看出,都是間隔一秒。源碼已經解析過了,這邊不在講解。分佈式
ReadWriteLockide
public class ReadWriteLock { static CuratorFramework client \= CuratorConnect.getCuratorClient2(); private static final String path \= "/readWriteLock"; public static void readWriteLock() { InterProcessReadWriteLock mutex = new InterProcessReadWriteLock(client, path); CountDownLatch countDownLatch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { new Thread(new Runnable() { @Override public void run() { try { countDownLatch.countDown(); countDownLatch.await(); int i = new Random().nextInt(); if (i % 9 \== 0) { mutex.writeLock().acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "獲取到寫鎖"); TimeUnit.SECONDS.sleep(1); mutex.writeLock().release(); }else{ mutex.readLock().acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "獲取到讀鎖"); TimeUnit.SECONDS.sleep(1); mutex.readLock().release(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } public static void main(String\[\] args) { ReadWriteLock.readWriteLock(); } }
運行結果以下(部分):
能夠看出,讀的時候,沒有間隔,第四行寫的時候,間隔了一秒。源碼分析
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) { lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length); writeMutex = new InternalInterProcessMutex ( client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver() { @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { return super.getsTheLock(client, children, sequenceNodeName, maxLeases); // 寫鎖,跟排他鎖是同樣的 } } ); readMutex = new InternalInterProcessMutex ( client, basePath, READ_LOCK_NAME, lockData, Integer.MAX_VALUE, new SortingLockInternalsDriver() { @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { return readLockPredicate(children, sequenceNodeName); } } ); }
readLockPredicateui
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception { // 是當前寫鎖,返回 if ( writeMutex.isOwnedByCurrentThread() ) { return new PredicateResults(null, true); } int index = 0; int firstWriteIndex = Integer.MAX_VALUE; int ourIndex = -1; // 遍歷全部子節點 for ( String node : children ) { // 若是節點有包含__WRIT__,獲取最小寫的節點位置 if ( node.contains(WRITE_LOCK_NAME) ) { firstWriteIndex = Math.min(index, firstWriteIndex); } // 獲取當前節點的位置 else if ( node.startsWith(sequenceNodeName) ) { ourIndex = index; break; } ++index; } StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex); // 當前的節點位置,若是在最小的寫節點以前,就獲取到寫,若是沒有,就監聽這個寫節點 boolean getsTheLock = (ourIndex < firstWriteIndex); String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex); return new PredicateResults(pathToWatch, getsTheLock); }
若是是讀請求,就會判斷前面的節點是否有寫請求,若是沒有,就獲取到鎖,若是有,就監聽寫節點。
若是是寫請求,就走排他鎖的流程。spa