zookeeper之分佈式鎖

分佈式鎖

在分佈式系統中,對一個共享資源的互斥訪問操做,就能夠用分佈式鎖來解決,相似於單機的Lock,分爲排他鎖和讀寫鎖。node

排他鎖

特色:segmentfault

  • 只能一個服務器對共享資源操做
  • 擁有操做資源的服務器釋放資源,其餘服務器能夠知道,並去搶資源

是否是和以前的zookeeper之master選舉很像。
因此zookeeper的如下特性,也是能夠知足分佈式鎖服務器

  • 強一致性:知足只能一個服務器搶到共享資源
  • 臨時節點:釋放資源,臨時節點隨着會話的關閉而消失
  • Watcher:監聽節點的變化,資源被釋放,就會被監聽到

InterProcessMutexacquirerelease方法,在zookeeper之master選舉提過。
DistributeLockdom

public class DistributeLock { 
    static CuratorFramework client = CuratorConnect.getCuratorClient2();  
    private static final String path = "distributeLock";  
  
    public static void distributeLock() {  
        InterProcessMutex mutex = new InterProcessMutex(client, path);  
        CountDownLatch countDownLatch = new CountDownLatch(50);  
        for (int i = 0; i < 50; i++) {  
            new Thread(new Runnable() {
                public void run() {
                    try {
                        countDownLatch.countDown();  
                        countDownLatch.await();  
                        mutex.acquire();  
                        System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "獲取到鎖");  
                        TimeUnit.SECONDS.sleep(1);  
                        mutex.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }  
  
    public static void main(String[] args) {  
        DistributeLock.distributeLock();  
    }  
}

運行結果以下(部分):
image.png
能夠看出,都是間隔一秒。源碼已經解析過了,這邊不在講解。分佈式

讀寫鎖

ReadWriteLockide

public class ReadWriteLock {  
    static CuratorFramework client \= CuratorConnect.getCuratorClient2();  
    private static final String path \= "/readWriteLock";  
  
    public static void readWriteLock() {  
        InterProcessReadWriteLock mutex = new InterProcessReadWriteLock(client, path);  
        CountDownLatch countDownLatch = new CountDownLatch(50);  
        for (int i = 0; i < 50; i++) {  
            new Thread(new Runnable() {  
 @Override  public void run() {  
  try {  
 countDownLatch.countDown(); countDownLatch.await();  int i = new Random().nextInt();  
  if (i % 9 \== 0) {  
 mutex.writeLock().acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "獲取到寫鎖");  
 TimeUnit.SECONDS.sleep(1);  
 mutex.writeLock().release(); }else{  
 mutex.readLock().acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "獲取到讀鎖");  
 TimeUnit.SECONDS.sleep(1);  
 mutex.readLock().release(); } } catch (InterruptedException e) {  
 e.printStackTrace(); } catch (Exception e) {  
 e.printStackTrace(); } } }).start();  
        }  
    }  
  
    public static void main(String\[\] args) {  
        ReadWriteLock.readWriteLock();  
    }  
}

運行結果以下(部分):
image.png
能夠看出,讀的時候,沒有間隔,第四行寫的時候,間隔了一秒。源碼分析

源碼分析

public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)  
{  
    lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);  
   
    writeMutex = new InternalInterProcessMutex  
    (  
        client,  
        basePath,  
        WRITE_LOCK_NAME,  
        lockData,  
        1,  
        new SortingLockInternalsDriver()  
        {  
            @Override  
  public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception  
            {  
                return super.getsTheLock(client, children, sequenceNodeName, maxLeases); // 寫鎖,跟排他鎖是同樣的 
            }  
        }  
    );  
  
    readMutex = new InternalInterProcessMutex  
    (  
        client,  
        basePath,  
        READ_LOCK_NAME,  
        lockData,  
        Integer.MAX_VALUE,  
        new SortingLockInternalsDriver()  
        {  
            @Override  
  public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception  
            {  
                return readLockPredicate(children, sequenceNodeName);  
            }  
        }  
    );  
}

readLockPredicateui

private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception  
{  
    // 是當前寫鎖,返回
    if ( writeMutex.isOwnedByCurrentThread() )  
    {  
        return new PredicateResults(null, true);  
    }  
  
    int index = 0;  
    int firstWriteIndex = Integer.MAX_VALUE;  
    int ourIndex = -1;  
    // 遍歷全部子節點
    for ( String node : children )  
    {  
        // 若是節點有包含__WRIT__,獲取最小寫的節點位置
        if ( node.contains(WRITE_LOCK_NAME) )  
        {  
            firstWriteIndex = Math.min(index, firstWriteIndex);  
        }  
        // 獲取當前節點的位置
        else if ( node.startsWith(sequenceNodeName) )  
        {  
            ourIndex = index;  
            break;  
        }  
  
        ++index;  
    }  
  
    StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);  
    // 當前的節點位置,若是在最小的寫節點以前,就獲取到寫,若是沒有,就監聽這個寫節點
    boolean getsTheLock = (ourIndex < firstWriteIndex);  
    String      pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);  
    return new PredicateResults(pathToWatch, getsTheLock);  
}

若是是讀請求,就會判斷前面的節點是否有寫請求,若是沒有,就獲取到鎖,若是有,就監聽寫節點。
若是是寫請求,就走排他鎖的流程。spa

相關文章
相關標籤/搜索