ZooKeeper經常使用客戶端java
Curator主要解決了三類問題node
package com.qxw.controller;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
* Curator主要解決了三類問題
1.封裝ZooKeeper client與ZooKeeper server之間的鏈接處理
2.提供了一套Fluent風格的操做API
3.提供ZooKeeper各類應用場景(recipe, 好比共享鎖服務, 集羣領導選舉機制)的抽象封裝
* @author qxw
* @data 2018年8月14日下午2:08:51
*/
public class CuratorAp {
/**
* Curator客戶端
*/
public static CuratorFramework client = null;
/**
* 集羣模式則是多個ip
*/
// private static final String zkServerIps = "192.168.10.124:2182,192.168.10.124:2183,192.168.10.124:2184";
private static final String zkServerIps = "127.0.0.1:2181";
public static CuratorFramework getConnection(){
if(client==null){
synchronized (CuratorAp.class){
if(client==null){
//經過工程建立鏈接
client= CuratorFrameworkFactory.builder()
.connectString(zkServerIps)
.connectionTimeoutMs(5000) ///鏈接超時時間
.sessionTimeoutMs(5000) // 設定會話時間
.retryPolicy(new ExponentialBackoffRetry(1000, 10)) // 重試策略:初試時間爲1s 重試10次
// .namespace("super") // 設置命名空間以及開始創建鏈接
.build();
//開啓鏈接
client.start();
//分佈鎖
System.out.println(client.getState());
}
}
}
return client;
}
/**
* 建立節點 不加withMode默認爲持久類型節點
* @param path 節點路徑
* @param value 值
*/
public static String create(String path,String value){
try {
//若建立節點的父節點不存在會先建立父節點再建立子節點
return getConnection().create().creatingParentsIfNeeded().forPath("/super"+path,value.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 建立節點
* @param path 節點路徑
* @param value 值
* @param modeType 節點類型
*/
public static String create(String path,String value,String modeType){
try {
if(StringUtils.isEmpty(modeType)){
return null;
}
//持久型節點
if(CreateMode.PERSISTENT.equals(modeType)){
//若建立節點的父節點不存在會先建立父節點再建立子節點
return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super"+path,value.getBytes());
}
//臨時節點
if(CreateMode.EPHEMERAL.equals(modeType)){
//若建立節點的父節點不存在會先建立父節點再建立子節點
return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/super"+path,value.getBytes());
}
//持久類型順序性節點
if(CreateMode.PERSISTENT_SEQUENTIAL.equals(modeType)){
//若建立節點的父節點不存在會先建立父節點再建立子節點
return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/super"+path,value.getBytes());
}
//臨時類型順序性節點
if(CreateMode.EPHEMERAL_SEQUENTIAL.equals(modeType)){
//若建立節點的父節點不存在會先建立父節點再建立子節點
return getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/super"+path,value.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 獲取單個節點
* @param path
* @return
*/
public static String getData(String path){
try {
String str = new String(getConnection().getData().forPath("/super"+path));
return str;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
*獲取字節點
* @param path
* @return
*/
public static List<String> getChildren(String path){
try {
List<String> list = getConnection().getChildren().forPath("/super"+path);
return list;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 修改節點值
* @param path
* @param valu
* @return
*/
public static String setData(String path,String valu){
try {
getConnection().setData().forPath("/super"+path,valu.getBytes());
String str = new String(getConnection().getData().forPath("/super"+path));
return str;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 刪除節點
* @param path
*/
public static void delete(String path){
try {
getConnection().delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"+path);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 檢測節點是否存在
* @param path
* @return
*/
public static boolean checkExists(String path){
try {
Stat s=getConnection().checkExists().forPath("/super"+path);
return s==null? false:true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 分佈式鎖 對象
* @param path
* @return
*/
public static InterProcessMutex getLock(String path){
InterProcessMutex lock=null;
try {
lock=new InterProcessMutex(getConnection(), "/super"+path);
return lock;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) throws Exception {
// if(checkExists("/qxw")){
// delete("/qxw");
// }
// System.out.println("建立節點:"+create("/qxw/q1", "蘇打水法薩芬撒"));
// System.out.println("建立節點:"+create("/qxw/q2", "蘇打水法薩芬撒"));
// System.out.println("建立節點:"+create("/qxw/q3", "蘇打水法薩芬撒"));
//
//
//
// ExecutorService pool = Executors.newCachedThreadPool();
// getConnection().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {
// public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
// System.out.println("code:" + ce.getResultCode());
// System.out.println("type:" + ce.getType());
// System.out.println("線程爲:" + Thread.currentThread().getName());
// }
// }, pool)
// .forPath("/super/qxw/q4","q4內容".getBytes());
//
// System.out.println("讀取節點: "+getData("/qxw"));
// System.out.println("讀取字節點:"+getChildren("/qxw").toString());
test();
}
/***
* 分佈鎖演示
*/
private static int count=0;
public static void test() throws InterruptedException{
final InterProcessMutex lock=getLock("/lock");
final CountDownLatch c=new CountDownLatch(10);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i <10; i++) {
pool.execute(new Runnable() {
public void run() {
try {
c.countDown();
Thread.sleep(1000);
//加鎖
lock.acquire();
System.out.println(System.currentTimeMillis()+"___"+(++count));
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
pool.shutdown();
c.await();
System.out.println("CountDownLatch執行完");
}
}
複製代碼
若是你是gradle用戶(2.0以上),請直接執行如下命令運行項目:git
gradle jettyRun
複製代碼
若是你沒使用gralde,執行項目跟路徑下的腳本,linux/windows用戶執行github
gradlew/gradlew.bat jettyRun
複製代碼
自動下載gralde完成後,會自動使用jetty啓動項目web
若是想將項目導入IDE調試,eclipse用戶執行apache
gradlew/gradlew.bat eclipse
複製代碼
idea用戶執行windows
gradlew/gradlew.bat idea
複製代碼
分佈式鎖主要用於在分佈式環境中保護跨進程、跨主機、跨網絡的共享資源實現互斥訪問,以達到保證數據的一致性。 api
左邊的整個區域表示一個Zookeeper集羣,locker是Zookeeper的一個持久節點,node_一、node_二、node_3是locker這個持久節點下面的臨時順序節點。client_一、client_二、client_n表示多個客戶端,Service表示須要互斥訪問的共享資源。bash
分佈式鎖獲取思路