java併發編程學習之CyclicBarrier這邊提到了單機應用的Barrier,在分佈式系統中,curator也實現了分佈式Barrier。curator提供了DistributedBarrier和DistributedDoubleBarrier兩種方式。java
MyDistributedBarrienode
public class MyDistributedBarrier { private static final String path = "/barrier"; static DistributedBarrier distributedBarrier; public static void distributedBarrier() throws Exception { for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { CuratorFramework client = CuratorConnect.getCuratorClient2(); distributedBarrier = new DistributedBarrier(client,path); try { distributedBarrier.setBarrier(); System.out.println("準備中"); distributedBarrier.waitOnBarrier(); System.out.println("結束了"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } TimeUnit.SECONDS.sleep(5); distributedBarrier.removeBarrier(); } public static void main(String[] args) throws Exception { MyDistributedBarrier.distributedBarrier(); } }
運行結果以下:
編程
大體流程:segmentfault
setBarrier,新增節點併發
public synchronized void setBarrier() throws Exception { try { client.create().creatingParentContainersIfNeeded().forPath(barrierPath); } catch ( KeeperException.NodeExistsException ignore ) { // ignore } }
waitOnBarrier分佈式
public synchronized void waitOnBarrier() throws Exception { waitOnBarrier(-1, null); } public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null); long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX\_VALUE; boolean result; for(;;) { // 若是result爲空,跳出循環,說明節點被刪除了。沒有爲空,就繼續監聽 result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null); if ( result ) { break; } // 休眠,等待喚醒 if ( hasMaxWait ) { long elapsed = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsed; if ( thisWaitMs <= 0 ) { break; } wait(thisWaitMs); } else { wait(); } } return result; }
removeBarrier
刪除節點,這個時候,會觸發監聽,喚醒waitOnBarrier的等待。ide
public synchronized void removeBarrier() throws Exception { try { client.delete().forPath(barrierPath); } catch ( KeeperException.NoNodeException ignore ) { // ignore } }
MyDistributedDoubleBarrier源碼分析
public class MyDistributedDoubleBarrier { private static final String path = "double_barrier"; public static void distributedDoubleBarrier() throws Exception { for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { CuratorFramework client = CuratorConnect.getCuratorClient2(); DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client,path,5); try { System.out.println("準備中"); doubleBarrier.enter(); System.out.println("開始了"); doubleBarrier.leave(); System.out.println("結束了"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } public static void main(String[] args) throws Exception { MyDistributedDoubleBarrier.distributedDoubleBarrier(); } }
運行結果以下:
學習
主要思路就是在一個節點下面,建立預約數量的子節點,當子節點數量不超過預約節點時就堵塞,直到知足。
大概流程:this
public void enter() throws Exception { enter(-1, null); } public boolean enter(long maxWait, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null); long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE; // 判斷ready節點是否存在 boolean readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null); // 建立臨時節點 client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath); // 若是ready節點存在,就返回true,不存在調用internalEnter boolean result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs)); if ( connectionLost.get() ) { throw new KeeperException.ConnectionLossException(); } return result; } private synchronized boolean internalEnter(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception { boolean result = true; do { // 獲取子節點的信息 List<String> children = getChildrenForEntering(); // 判斷是否已經建立夠子節點數量了 int count = (children != null) ? children.size() : 0; if ( count >= memberQty ) { try { //建立夠了,則建立ready節點 client.create().forPath(readyPath); } catch ( KeeperException.NodeExistsException ignore ) { // ignore } break; } // 沒建立夠,休眠等待喚醒 if ( hasMaxWait && !hasBeenNotified.get() ) { long elapsed = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsed; if ( thisWaitMs <= 0 ) { result = false; } else { wait(thisWaitMs); } if ( !hasBeenNotified.get() ) { result = false; } } else { wait(); } } while ( false ); return result; }
主要思路,就是子節點刪除完以前堵塞,刪除完了再刪除ready節點。
大概流程:
public synchronized void leave() throws Exception { leave(-1, null); } public synchronized boolean leave(long maxWait, TimeUnit unit) throws Exception { long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null); long maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE; return internalLeave(startMs, hasMaxWait, maxWaitMs); } private boolean internalLeave(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception { String ourPathName = ZKPaths.getNodeFromPath(ourPath); boolean ourNodeShouldExist = true; boolean result = true; for(;;) { if ( connectionLost.get() ) { throw new KeeperException.ConnectionLossException(); } List<String> children; try { // 獲取節點信息 children = client.getChildren().forPath(barrierPath); } catch ( KeeperException.NoNodeException dummy ) { children = Lists.newArrayList(); } // 過濾ready節點並排序 children = filterAndSortChildren(children); // 已經沒有子節點了,則跳出循環 if ( (children == null) || (children.size() == 0) ) { break; } // 當前節點的位置 int ourIndex = children.indexOf(ourPathName); // 小於0,說明當前節點不在子節點裏,拋異常 if ( (ourIndex < 0) && ourNodeShouldExist ) { if ( connectionLost.get() ) { break; // connection was lost but we've reconnected. However, our ephemeral node is gone } else { throw new IllegalStateException(String.format("Our path (%s) is missing", ourPathName)); } } // 節點數量爲1,判斷是不是當前節點,是的話則刪除 if ( children.size() == 1 ) { if ( ourNodeShouldExist && !children.get(0).equals(ourPathName) ) { throw new IllegalStateException(String.format("Last path (%s) is not ours (%s)", children.get(0), ourPathName)); } checkDeleteOurPath(ourNodeShouldExist); break; } Stat stat; boolean IsLowestNode = (ourIndex == 0); // 當前節點是最小的,則監聽最大節點是否被刪除 if ( IsLowestNode ) { String highestNodePath = ZKPaths.makePath(barrierPath, children.get(children.size() - 1)); stat = client.checkExists().usingWatcher(watcher).forPath(highestNodePath); } else { // 不是最小的,則監聽最小節點是否被刪除 String lowestNodePath = ZKPaths.makePath(barrierPath, children.get(0)); stat = client.checkExists().usingWatcher(watcher).forPath(lowestNodePath); // 刪除本身的節點 checkDeleteOurPath(ourNodeShouldExist); ourNodeShouldExist = false; } // 休眠等喚醒 if ( stat != null ) { if ( hasMaxWait ) { long elapsed = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsed; if ( thisWaitMs <= 0 ) { result = false; } else { wait(thisWaitMs); } } else { wait(); } } } try { client.delete().forPath(readyPath); } catch ( KeeperException.NoNodeException ignore ) { // ignore } return result; }