基於zookeeper的分佈式鎖實現(監聽父節點)

    前言:在zk中,當有節點新增,刪除,或者節點內容發生改變的時候,只要對節點註冊了監聽事件,那麼當發生上述節點變化的時候,zk會自動觸發監聽事件並通知客戶端,客戶端拿到對應事件通知後,就能夠作相應的業務處理java

    本文涉及到的節點:node

           1.父節點:/disLocks1(zk根目錄下的disLocks1目錄,CreateMode.PERSISTENT類型)apache

      2.全部須要獲取鎖的線程,都會在/disLocks1目錄下創建一個臨時順序的子節點(CreateMode.EPHEMERAL_SEQUENTIAL類型)緩存

           3.每次都是序號最小的節點獲取鎖,當最小的節點業務邏輯處理完畢後,斷開本次鏈接(或者刪除當前子節點),則臨時順序的節點自動刪除,接着讓其餘沒有獲取鎖的節點去獲取鎖多線程

貼代碼:併發

一個JVM,10個線程併發獲取鎖()多jvm,只須要事先創建父節點便可jvm

package zoo.com.max.zoo.lock;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;


/**
 * 
 * zk分佈式鎖實現
 * 基於監聽父節點下面的所有子節點實現,效率較低
 * */
public class DistributedLock implements Watcher{
	
	public static String host="127.0.0.1:2181";
    //緩存時間  
    private static final int TIME_OUT   = 2000; 
    
    private static String FATHER_PATH = "/disLocks1";
    
    private ZooKeeper zk;
    
    private int threadId;
    
    protected  CountDownLatch countDownLatch=new CountDownLatch(1); 
    
    public DistributedLock(int threadId){
    	this.threadId = threadId;
    }
    //獲取zk鏈接
    public void getZkClient(String host,int timeout)
	{
    	try {
    		if(null == zk){
        		zk = new ZooKeeper(host, timeout, this);
    		}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	/**
	 * 建立子節點
	 * 
	 * */
    public String createNode(){
    	try {
    		//檢測節點是否存在
			Stat stat = zk.exists(FATHER_PATH, false);
			//父節點不存在,則建立父節點,防止多線程併發建立父節點,因此加上同步代碼塊,防止在同一個jvm中的併發建立,多jvm環境下, 父節點能夠事先建立好
			if(Objects.isNull(stat)){
				synchronized (FATHER_PATH) {
					Stat stat2 = zk.exists(FATHER_PATH, false);
					if(Objects.isNull(stat2)){
						//父節點是持久節點
						String path = zk.create(FATHER_PATH, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
						System.out.println("父節點建立成功,返回值【"+path+"】");
					}

				}
			}
			//建立持久性父節點下面的臨時順序子節點,/父節點路徑/0000000002
	        String lockPath = zk.create(FATHER_PATH+"/",null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  
        	System.out.println("線程【"+threadId+"】開始執行,子節點建立成功,返回值【"+lockPath+"】");
        	return lockPath;
		} catch (KeeperException e1) {
			e1.printStackTrace();
		} catch (InterruptedException e1) {
			e1.printStackTrace();
		}
		return null;
    }
    
    //校驗當前節點是否爲序號最小的節點
    public boolean checkLockPath(String lockPath){
    	try {
    		//註冊父節點監聽事件,當父節點下面的子節點有變化,就會觸發Watcher事件
			List<String> nodeList = zk.getChildren(FATHER_PATH, this);
			Collections.sort(nodeList);
			int index = nodeList.indexOf( lockPath.substring(FATHER_PATH.length()+1));  
	         switch (index){  
	             case -1:{  
	            	 System.out.println("本節點已不在了"+lockPath);
	                 return false;  
	             }  
	             case 0:{  
	            	 System.out.println("線程【"+threadId+"】獲取鎖成功,子節點序號【"+lockPath+"】");
	                 return true;  
	             }  
	             default:{  
	                 String waitPath = nodeList.get(index - 1);  
	                 System.out.println(waitPath+"在"+nodeList.get(index)+"點前面,須要等待【"+nodeList.get(index)+"】");
	                 return false;
	             } 
	         }
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return false;
    }
    
    public boolean getLock(){
    	//建立獲取鎖的節點(順序臨時節點)
    	String childPath = createNode();
    	boolean flag = true;
    	if(null != childPath){
			try {
				//輪詢等待zk獲取鎖的通知
				while(flag){
					if(checkLockPath(childPath)){
						//獲取鎖成功
						return true;
					}else{
						//節點建立成功, 則等待zk通知
						countDownLatch.await();
					}
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
    	}else{
    		System.out.println("節點沒有建立成功,獲取鎖失敗");
    	}
    	return false;
    }
    
	public void process(WatchedEvent event) {
		//成功鏈接zk,狀態判斷
		if(event.getState() == KeeperState.SyncConnected){
			//子節點有變化
			if(event.getType() == EventType.NodeChildrenChanged){
				countDownLatch.countDown();
			}
		}		
	}
	
	public void unlock(){
		try {
			zk.close();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	
	public ZooKeeper getZooKeeper(){
		return zk;
	}
	public static void main(String[] args) throws KeeperException, InterruptedException {
		 for(int i=0; i < 10; i++){  
	            final int threadId = i+1;  
	            new Thread(){  
	                @Override  
	                public void run() {  
	                    try{  
	                        DistributedLock dis = new DistributedLock(threadId);  
	                        dis.getZkClient(host,TIME_OUT);
	                        if(dis.getLock()){
	                        	Thread.sleep(200);
	                        	dis.unlock();
	                        }
	                    } catch (Exception e){  
	                    	System.out.println("【第"+threadId+"個線程】 拋出的異常:");
	                        e.printStackTrace();  
	                    }  
	                }  
	            }.start();  
	        }  
	}
}

第二遍會改進爲向子節點註冊監聽事件, 這樣就不用全部子節點都去向父節點註冊事件,子節點只會在本身前面一個節點註冊節點刪除事件分佈式

新手碼農,若有錯誤,但願你們多多指教,共同進步ide

相關文章
相關標籤/搜索