前言:在zk中,當有節點新增,刪除,或者節點內容發生改變的時候,只要對節點註冊了監聽事件,那麼當發生上述節點變化的時候,zk會自動觸發監聽事件並通知客戶端,客戶端拿到對應事件通知後,就能夠作相應的業務處理java
本文涉及到的節點:node
1.父節點:/disLocks1(zk根目錄下的disLocks1目錄,CreateMode.PERSISTENT類型)apache
2.全部須要獲取鎖的線程,都會在/disLocks1目錄下創建一個臨時順序的子節點(CreateMode.EPHEMERAL_SEQUENTIAL類型)緩存
3.每次都是序號最小的節點獲取鎖,當最小的節點業務邏輯處理完畢後,斷開本次鏈接(或者刪除當前子節點),則臨時順序的節點自動刪除,接着讓其餘沒有獲取鎖的節點去獲取鎖多線程
貼代碼:併發
一個JVM,10個線程併發獲取鎖()多jvm,只須要事先創建父節點便可jvm
package zoo.com.max.zoo.lock; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * * zk分佈式鎖實現 * 基於監聽父節點下面的所有子節點實現,效率較低 * */ public class DistributedLock implements Watcher{ public static String host="127.0.0.1:2181"; //緩存時間 private static final int TIME_OUT = 2000; private static String FATHER_PATH = "/disLocks1"; private ZooKeeper zk; private int threadId; protected CountDownLatch countDownLatch=new CountDownLatch(1); public DistributedLock(int threadId){ this.threadId = threadId; } //獲取zk鏈接 public void getZkClient(String host,int timeout) { try { if(null == zk){ zk = new ZooKeeper(host, timeout, this); } } catch (IOException e) { e.printStackTrace(); } } /** * 建立子節點 * * */ public String createNode(){ try { //檢測節點是否存在 Stat stat = zk.exists(FATHER_PATH, false); //父節點不存在,則建立父節點,防止多線程併發建立父節點,因此加上同步代碼塊,防止在同一個jvm中的併發建立,多jvm環境下, 父節點能夠事先建立好 if(Objects.isNull(stat)){ synchronized (FATHER_PATH) { Stat stat2 = zk.exists(FATHER_PATH, false); if(Objects.isNull(stat2)){ //父節點是持久節點 String path = zk.create(FATHER_PATH, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("父節點建立成功,返回值【"+path+"】"); } } } //建立持久性父節點下面的臨時順序子節點,/父節點路徑/0000000002 String lockPath = zk.create(FATHER_PATH+"/",null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("線程【"+threadId+"】開始執行,子節點建立成功,返回值【"+lockPath+"】"); return lockPath; } catch (KeeperException e1) { e1.printStackTrace(); } catch (InterruptedException e1) { e1.printStackTrace(); } return null; } //校驗當前節點是否爲序號最小的節點 public boolean checkLockPath(String lockPath){ try { //註冊父節點監聽事件,當父節點下面的子節點有變化,就會觸發Watcher事件 List<String> nodeList = zk.getChildren(FATHER_PATH, this); Collections.sort(nodeList); int index = nodeList.indexOf( lockPath.substring(FATHER_PATH.length()+1)); switch (index){ case -1:{ System.out.println("本節點已不在了"+lockPath); return false; } case 0:{ System.out.println("線程【"+threadId+"】獲取鎖成功,子節點序號【"+lockPath+"】"); return true; } default:{ String waitPath = nodeList.get(index - 1); System.out.println(waitPath+"在"+nodeList.get(index)+"點前面,須要等待【"+nodeList.get(index)+"】"); return false; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public boolean getLock(){ //建立獲取鎖的節點(順序臨時節點) String childPath = createNode(); boolean flag = true; if(null != childPath){ try { //輪詢等待zk獲取鎖的通知 while(flag){ if(checkLockPath(childPath)){ //獲取鎖成功 return true; }else{ //節點建立成功, 則等待zk通知 countDownLatch.await(); } } } catch (InterruptedException e) { e.printStackTrace(); } }else{ System.out.println("節點沒有建立成功,獲取鎖失敗"); } return false; } public void process(WatchedEvent event) { //成功鏈接zk,狀態判斷 if(event.getState() == KeeperState.SyncConnected){ //子節點有變化 if(event.getType() == EventType.NodeChildrenChanged){ countDownLatch.countDown(); } } } public void unlock(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public ZooKeeper getZooKeeper(){ return zk; } public static void main(String[] args) throws KeeperException, InterruptedException { for(int i=0; i < 10; i++){ final int threadId = i+1; new Thread(){ @Override public void run() { try{ DistributedLock dis = new DistributedLock(threadId); dis.getZkClient(host,TIME_OUT); if(dis.getLock()){ Thread.sleep(200); dis.unlock(); } } catch (Exception e){ System.out.println("【第"+threadId+"個線程】 拋出的異常:"); e.printStackTrace(); } } }.start(); } } }
第二遍會改進爲向子節點註冊監聽事件, 這樣就不用全部子節點都去向父節點註冊事件,子節點只會在本身前面一個節點註冊節點刪除事件分佈式
新手碼農,若有錯誤,但願你們多多指教,共同進步ide