zookeeper之分佈式Barrier

java併發編程學習之CyclicBarrier這邊提到了單機應用的Barrier,在分佈式系統中,curator也實現了分佈式Barrier。curator提供了DistributedBarrier和DistributedDoubleBarrier兩種方式。java

DistributedBarrier

MyDistributedBarrienode

public class MyDistributedBarrier {  
    private static final String path = "/barrier";  
    static DistributedBarrier distributedBarrier;  
    public static void distributedBarrier() throws Exception {  
  
        for (int i = 0; i < 5; i++) {  
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CuratorFramework client = CuratorConnect.getCuratorClient2();
                    distributedBarrier = new DistributedBarrier(client,path);
                    try {
                        distributedBarrier.setBarrier();
                        System.out.println("準備中");
                        distributedBarrier.waitOnBarrier();
                        System.out.println("結束了");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }  
        TimeUnit.SECONDS.sleep(5);  
        distributedBarrier.removeBarrier();  
    }  
  
    public static void main(String[] args) throws Exception {  
        MyDistributedBarrier.distributedBarrier();  
    }  
}

運行結果以下:
image.png編程

源碼分析

大體流程:segmentfault

  1. 新增節點,若是已經新增了,就忽略
  2. 監聽節點,自旋判斷節點是否被刪除,若是被刪除,則跳出循環

setBarrier,新增節點併發

public synchronized void setBarrier() throws Exception  
{  
    try  
  {  
        client.create().creatingParentContainersIfNeeded().forPath(barrierPath);  
    }  
    catch ( KeeperException.NodeExistsException ignore )  
    {  
        // ignore  
  }  
}

waitOnBarrier分佈式

public synchronized void waitOnBarrier() throws Exception  
{  
    waitOnBarrier(-1, null);  
}

public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception  
{  
    long startMs = System.currentTimeMillis();  
    boolean hasMaxWait = (unit != null);  
    long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX\_VALUE;  
  
    boolean result;  
    for(;;)  
    {  
        // 若是result爲空,跳出循環,說明節點被刪除了。沒有爲空,就繼續監聽
        result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);  
        if ( result )  
        {  
            break;  
        }  
        // 休眠,等待喚醒
        if ( hasMaxWait )  
        {  
            long elapsed = System.currentTimeMillis() - startMs;  
            long thisWaitMs = maxWaitMs - elapsed;  
            if ( thisWaitMs <= 0 )  
            {  
                break;  
            }  
            wait(thisWaitMs);  
        }  
        else  
  {  
            wait();  
        }  
    }  
    return result;  
}

removeBarrier
刪除節點,這個時候,會觸發監聽,喚醒waitOnBarrier的等待。ide

public synchronized void removeBarrier() throws Exception  
{  
    try  
  {  
        client.delete().forPath(barrierPath);  
    }  
    catch ( KeeperException.NoNodeException ignore )  
    {  
        // ignore  
  }  
}

DistributedDoubleBarrier

MyDistributedDoubleBarrier源碼分析

public class MyDistributedDoubleBarrier {
    private static final String path = "double_barrier";
    public static void distributedDoubleBarrier() throws Exception {
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CuratorFramework client = CuratorConnect.getCuratorClient2();
                    DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client,path,5);
                    try {
                        System.out.println("準備中");
                        doubleBarrier.enter();
                        System.out.println("開始了");
                        doubleBarrier.leave();
                        System.out.println("結束了");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            }).start();
        }
    }

    public static void main(String[] args) throws Exception {
        MyDistributedDoubleBarrier.distributedDoubleBarrier();
    }
}

運行結果以下:
image.png學習

源碼分析

enter

主要思路就是在一個節點下面,建立預約數量的子節點,當子節點數量不超過預約節點時就堵塞,直到知足。
大概流程:this

  1. 判斷ready節點是否存在,若是存在,說明子節點數量已經夠了,就不用建立ready節點。
  2. 若是不存在,則自旋判斷,是否節點數量超過預約數量,沒有則休眠,有則跳出循環
public void enter() throws Exception  
{  
    enter(-1, null);  
}

public boolean enter(long maxWait, TimeUnit unit) throws Exception  
{  
    long startMs = System.currentTimeMillis();  
    boolean hasMaxWait = (unit != null);  
    long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;  
    // 判斷ready節點是否存在
    boolean readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null);  
    // 建立臨時節點
    client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath);  
    // 若是ready節點存在,就返回true,不存在調用internalEnter
    boolean result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs));  
    if ( connectionLost.get() )  
    {  
        throw new KeeperException.ConnectionLossException();  
    }  
  
    return result;  
}

private synchronized boolean internalEnter(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception  
{  
    boolean result = true;  
    do  
  {  
        // 獲取子節點的信息
        List<String>    children = getChildrenForEntering();  
        // 判斷是否已經建立夠子節點數量了
        int count = (children != null) ? children.size() : 0;  
        if ( count >= memberQty )  
        {  
            try  
  {  
                //建立夠了,則建立ready節點
                client.create().forPath(readyPath);  
            }  
            catch ( KeeperException.NodeExistsException ignore )  
            {  
                // ignore  
  }  
            break;  
        }  
        // 沒建立夠,休眠等待喚醒
        if ( hasMaxWait && !hasBeenNotified.get() )  
        {  
            long elapsed = System.currentTimeMillis() - startMs;  
            long thisWaitMs = maxWaitMs - elapsed;  
            if ( thisWaitMs <= 0 )  
            {  
                result = false;  
            }  
            else  
  {  
                wait(thisWaitMs);  
            }  
  
            if ( !hasBeenNotified.get() )  
            {  
                result = false;  
            }  
        }  
        else  
  {  
            wait();  
        }  
    } while ( false );  
  
    return result;  
}

leave

主要思路,就是子節點刪除完以前堵塞,刪除完了再刪除ready節點。
大概流程:

  1. 自旋
  2. 獲取子節點信息
  3. 過濾掉ready節點,並排序
  4. 若是子節點數量爲0,說明已經刪除完,則跳出循環
  5. 獲取當前節點的位置,若是是子節點就剩下一個,而且就是當前節點,則刪除跳出循環
  6. 若是子節點不止一個,看當前節點的位置,若是是第一個,就監聽最高節點,若是不是第一個,就監聽第一個節點,而且把本身節點刪除,進入休眠等到喚醒。這樣的話,就是除了第一個節點,其餘的節點都會刪除。最高節點的刪除會觸發第一個節點的喚醒事件,繼續判斷是否只有一個節點,是到5的步驟。刪除完後,不是第一個節點的其餘節點,跳到4的步驟
  7. 刪除ready節點。
public synchronized void leave() throws Exception  
{  
    leave(-1, null);  
}
public synchronized boolean leave(long maxWait, TimeUnit unit) throws Exception  
{  
    long startMs = System.currentTimeMillis();  
    boolean hasMaxWait = (unit != null);  
    long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;  
  
    return internalLeave(startMs, hasMaxWait, maxWaitMs);  
}

private boolean internalLeave(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception  
{  
    String          ourPathName = ZKPaths.getNodeFromPath(ourPath);  
    boolean ourNodeShouldExist = true;  
    boolean result = true;  
    for(;;)  
    {  
        if ( connectionLost.get() )  
        {  
            throw new KeeperException.ConnectionLossException();  
        }  
  
        List<String> children;  
        try  
  {  
            // 獲取節點信息
            children = client.getChildren().forPath(barrierPath);  
        }  
        catch ( KeeperException.NoNodeException dummy )  
        {  
            children = Lists.newArrayList();  
        }  
        // 過濾ready節點並排序
        children = filterAndSortChildren(children);  
        // 已經沒有子節點了,則跳出循環
        if ( (children == null) || (children.size() == 0) )  
        {  
            break;  
        }  
        // 當前節點的位置
        int ourIndex = children.indexOf(ourPathName);  
        // 小於0,說明當前節點不在子節點裏,拋異常
        if ( (ourIndex < 0) && ourNodeShouldExist )  
        {  
            if ( connectionLost.get() )  
            {  
                break;  // connection was lost but we've reconnected. However, our ephemeral node is gone  
  }  
            else  
  {  
                throw new IllegalStateException(String.format("Our path (%s) is missing", ourPathName));  
            }  
        }  
        // 節點數量爲1,判斷是不是當前節點,是的話則刪除
        if ( children.size() == 1 )  
        {  
            if ( ourNodeShouldExist && !children.get(0).equals(ourPathName) )  
            {  
                throw new IllegalStateException(String.format("Last path (%s) is not ours (%s)", children.get(0), ourPathName));  
            }  
            checkDeleteOurPath(ourNodeShouldExist);  
            break;  
        }  
  
        Stat            stat;  
        boolean IsLowestNode = (ourIndex == 0);
        // 當前節點是最小的,則監聽最大節點是否被刪除
        if ( IsLowestNode )  
        {  
            String  highestNodePath = ZKPaths.makePath(barrierPath, children.get(children.size() - 1));  
            stat = client.checkExists().usingWatcher(watcher).forPath(highestNodePath);  
        }  
        else  
  {  
            // 不是最小的,則監聽最小節點是否被刪除
            String  lowestNodePath = ZKPaths.makePath(barrierPath, children.get(0));  
            stat = client.checkExists().usingWatcher(watcher).forPath(lowestNodePath);  
            // 刪除本身的節點
            checkDeleteOurPath(ourNodeShouldExist);  
            ourNodeShouldExist = false;  
        }  
        // 休眠等喚醒
        if ( stat != null )  
        {  
            if ( hasMaxWait )  
            {  
                long elapsed = System.currentTimeMillis() - startMs;  
                long thisWaitMs = maxWaitMs - elapsed;  
                if ( thisWaitMs <= 0 )  
                {  
                    result = false;  
                }  
                else  
  {  
                    wait(thisWaitMs);  
                }  
            }  
            else  
  {  
                wait();  
            }  
        }  
    }  
  
    try  
  {  
        client.delete().forPath(readyPath);  
    }  
    catch ( KeeperException.NoNodeException ignore )  
    {  
        // ignore  
  }  
  
    return result;  
}
相關文章
相關標籤/搜索