Apache Curator 操做Zookeeper api

Zookeeper客戶端(Apache Curator)

ZooKeeper經常使用客戶端java

  • zookeeper自帶的客戶端是官方提供的,比較底層、使用起來寫代碼麻煩、不夠直接。
  • Apache Curator是Apache的開源項目,封裝了zookeeper自帶的客戶端,使用相對簡便,易於使用。
  • zkclient是另外一個開源的ZooKeeper客戶端,其地址:github.com/adyliu/zkcl…

Curator主要解決了三類問題node

  • 封裝ZooKeeper client與ZooKeeper server之間的鏈接處理
  • 提供了一套Fluent風格的操做API
  • 提供ZooKeeper各類應用場景(recipe, 好比共享鎖服務, 集羣領導選舉機制)的抽象封裝

Java操做api

package com.qxw.controller;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
 *  Curator主要解決了三類問題
	1.封裝ZooKeeper client與ZooKeeper server之間的鏈接處理
	2.提供了一套Fluent風格的操做API
	3.提供ZooKeeper各類應用場景(recipe, 好比共享鎖服務, 集羣領導選舉機制)的抽象封裝
 * @author qxw
 * @data 2018年8月14日下午2:08:51
 */
public class CuratorAp {
	/**
	 * Curator客戶端
	 */
    public static CuratorFramework client = null;
    /**
     * 集羣模式則是多個ip
     */
//    private static final String zkServerIps = "192.168.10.124:2182,192.168.10.124:2183,192.168.10.124:2184";
    private static final String zkServerIps = "127.0.0.1:2181";

	public static CuratorFramework getConnection(){
	        if(client==null){
	             synchronized (CuratorAp.class){
	               if(client==null){
	            		//經過工程建立鏈接
	           		client= CuratorFrameworkFactory.builder()
	           				.connectString(zkServerIps)
	           				.connectionTimeoutMs(5000) ///鏈接超時時間
	           				.sessionTimeoutMs(5000)  // 設定會話時間
	           				.retryPolicy(new ExponentialBackoffRetry(1000, 10))   // 重試策略:初試時間爲1s 重試10次
//	           				.namespace("super")  // 設置命名空間以及開始創建鏈接
	           				.build();
	           		
	           		 //開啓鏈接
	           		  client.start();
	           		  //分佈鎖
	           		 
		        	  System.out.println(client.getState());
	                }
	            }
	        }
			return client;
	  }
	
	/**
	 * 建立節點   不加withMode默認爲持久類型節點
	 * @param path  節點路徑
	 * @param value  值
	 */
	public static String create(String path,String value){
		try {
			//若建立節點的父節點不存在會先建立父節點再建立子節點
			return getConnection().create().creatingParentsIfNeeded().forPath("/super"+path,value.getBytes());
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	/**
	 * 建立節點 
	 * @param path  節點路徑
	 * @param value  值
	 * @param modeType 節點類型
	 */
	public static String create(String path,String value,String modeType){
		try {
			if(StringUtils.isEmpty(modeType)){
				return null;
			}
			//持久型節點
			if(CreateMode.PERSISTENT.equals(modeType)){
				//若建立節點的父節點不存在會先建立父節點再建立子節點
				return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super"+path,value.getBytes());
			}
			//臨時節點
			if(CreateMode.EPHEMERAL.equals(modeType)){
				//若建立節點的父節點不存在會先建立父節點再建立子節點
				return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/super"+path,value.getBytes());
			}
			
			//持久類型順序性節點
			if(CreateMode.PERSISTENT_SEQUENTIAL.equals(modeType)){
				//若建立節點的父節點不存在會先建立父節點再建立子節點
				return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/super"+path,value.getBytes());
			}
			
			//臨時類型順序性節點
			if(CreateMode.EPHEMERAL_SEQUENTIAL.equals(modeType)){
				//若建立節點的父節點不存在會先建立父節點再建立子節點
				return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/super"+path,value.getBytes());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	
	
	
	
	/**
	 * 獲取單個節點
	 * @param path
	 * @return
	 */
	public static String getData(String path){
		try {
			String str = new String(getConnection().getData().forPath("/super"+path));
			return str;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 *獲取字節點
	 * @param path
	 * @return
	 */
	public static List<String> getChildren(String path){
		try {
			List<String> list = getConnection().getChildren().forPath("/super"+path);
			return list;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	
	/**
	 * 修改節點值
	 * @param path
	 * @param valu
	 * @return
	 */
	public static String setData(String path,String valu){
		try {
			getConnection().setData().forPath("/super"+path,valu.getBytes());
			String str = new String(getConnection().getData().forPath("/super"+path));
			return str;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 * 刪除節點
	 * @param path
	 */
	public static void  delete(String path){
		try {
			getConnection().delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"+path);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
	/**
	 * 檢測節點是否存在
	 * @param path
	 * @return
	 */
	public static boolean  checkExists(String path){
		try {
			Stat s=getConnection().checkExists().forPath("/super"+path);
			return s==null? false:true;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
		
	}
	
	/**
	 * 分佈式鎖 對象
	 * @param path
	 * @return
	 */
	public static InterProcessMutex getLock(String path){
		InterProcessMutex lock=null;
		try {
			lock=new InterProcessMutex(getConnection(), "/super"+path);
			 return  lock;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
	
	
	
	public static void main(String[] args) throws Exception {
//		if(checkExists("/qxw")){
//			delete("/qxw");
//		}
//		System.out.println("建立節點:"+create("/qxw/q1", "蘇打水法薩芬撒"));
//		System.out.println("建立節點:"+create("/qxw/q2", "蘇打水法薩芬撒"));
//		System.out.println("建立節點:"+create("/qxw/q3", "蘇打水法薩芬撒"));
//		
//
//		
//		ExecutorService pool = Executors.newCachedThreadPool();
//		getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {
//			public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
//				System.out.println("code:" + ce.getResultCode());
//				System.out.println("type:" + ce.getType());
//				System.out.println("線程爲:" + Thread.currentThread().getName());
//			}
//		}, pool)
//		.forPath("/super/qxw/q4","q4內容".getBytes());
//		
//		System.out.println("讀取節點: "+getData("/qxw"));
//		System.out.println("讀取字節點:"+getChildren("/qxw").toString());
		
		test();
		
	}
	
	/***
	 * 分佈鎖演示
	 */
	private static int count=0;
	public  static void test() throws InterruptedException{
		final InterProcessMutex lock=getLock("/lock");
		final CountDownLatch c=new CountDownLatch(10);
		ExecutorService pool = Executors.newCachedThreadPool();
		for (int i = 0; i <10; i++) {
			pool.execute(new Runnable() {
				public void run() {
					try {		
						c.countDown();
						Thread.sleep(1000);
						//加鎖
						lock.acquire();
						System.out.println(System.currentTimeMillis()+"___"+(++count));
					} catch (Exception e) {
						e.printStackTrace();
					}finally{
						try {
							lock.release();
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
					
				}
			});
		}
		pool.shutdown();
		c.await();
		System.out.println("CountDownLatch執行完");
	}
}

複製代碼

zookeeper 集羣的 監控圖形化頁面

gitee.com/crystony/zo…linux

若是你是gradle用戶(2.0以上),請直接執行如下命令運行項目:git

gradle jettyRun
複製代碼

若是你沒使用gralde,執行項目跟路徑下的腳本,linux/windows用戶執行github

gradlew/gradlew.bat jettyRun
複製代碼

自動下載gralde完成後,會自動使用jetty啓動項目web

若是想將項目導入IDE調試,eclipse用戶執行apache

gradlew/gradlew.bat eclipse
複製代碼

idea用戶執行windows

gradlew/gradlew.bat idea
複製代碼

zookeeper分佈式鎖原理

分佈式鎖主要用於在分佈式環境中保護跨進程、跨主機、跨網絡的共享資源實現互斥訪問,以達到保證數據的一致性。 api

輸入圖片說明
屏幕截圖.png

左邊的整個區域表示一個Zookeeper集羣,locker是Zookeeper的一個持久節點,node_一、node_二、node_3是locker這個持久節點下面的臨時順序節點。client_一、client_二、client_n表示多個客戶端,Service表示須要互斥訪問的共享資源。bash

分佈式鎖獲取思路

  1. 在獲取分佈式鎖的時候在locker節點下建立臨時順序節點,釋放鎖的時候刪除該臨時節點。
  2. 客戶端調用createNode方法在locker下建立臨時順序節點,而後調用getChildren(「locker」)來獲取locker下面的全部子節點,注意此時不用設置任何Watcher。
  3. 客戶端獲取到全部的子節點path以後,若是發現本身建立的子節點序號最小,那麼就認爲該客戶端獲取到了鎖。
  4. 若是發現本身建立的節點並不是locker全部子節點中最小的,說明本身尚未獲取到鎖,此時客戶端須要找到比本身小的那個節點,而後對其調用exist()方法,同時對其註冊事件監聽器。
  5. 以後,等待它釋放鎖,也就是等待獲取到鎖的那個客戶端B把本身建立的那個節點刪除。,則客戶端A的Watcher會收到相應通知,此時再次判斷本身建立的節點是不是locker子節點中序號最小的,若是是則獲取到了鎖,若是不是則重複以上步驟繼續獲取到比本身小的一個節點並註冊監聽。
相關文章
相關標籤/搜索