ZooKeeper(動物園管理員),顧名思義,是用來管理Hadoop(大象)、Hive(蜜蜂)、Pig(小豬)的管理員,同時Apache HBase、Apache Solr、LinkedIn Sensei等衆多項目中都採用了ZooKeeper。 ZooKeeper曾是Hadoop的正式子項目,後發展成爲Apache頂級項目,與Hadoop密切相關但卻沒有任何依賴。它是一個針對大型應用提供高可用的數據管理、應用程序協調服務的分佈式服務框架,基於對Paxos算法的實現,使該框架保證了分佈式環境中數據的強一致性,提供的功能包括:配置維護、統一命名服務、狀態同步服務、集羣管理等。 在分佈式應用中,因爲工程師不能很好地使用鎖機制,以及基於消息的協調機制不適合在某些應用中使用,所以須要有一種可靠的、可擴展的、分佈式的、可配置的協調機制來統一系統的狀態。Zookeeper的目的就在於此java
ZooKeeper能夠理解爲相似redis的緩存數據庫,只是相對於redis存儲數據量小,額外增長了存儲節點的機制, 經常使用於分佈式協調服務node
Curator經常使用apiredis
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
複製代碼
//建立節點
//PERSISTENT:持久化 默認模式
//PERSISTENT_SEQUENTIAL:持久化而且帶序列號
//EPHEMERAL:臨時
//EPHEMERAL_SEQUENTIAL:臨時而且帶序列號
//建立節點並遞歸建立父節點,並指定建立模式
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/data/path", "hello".getBytes());
複製代碼
client.delete().deletingChildrenIfNeeded().forPath("/data/path");
複製代碼
client.setData().forPath("/data/path", "world".getBytes());
複製代碼
byte[] data = client.getData().forPath("/data/path");//獲取指定節點數據
List<String> childs = client.getChildren().forPath("/")//獲取子節點
複製代碼
Curator事件(cache)算法
ZooKeeper原生支持經過註冊Watcher來進行事件監聽,可是其使用並非特別方便,須要開發人員本身反覆註冊Watcher,比較繁瑣。Curator引入了Cache來實現對ZooKeeper服務端事件的監聽。Cache是Curator中對事件監聽的包裝,其對事件的監聽其實能夠近似看做是一個本地緩存視圖和遠程ZooKeeper視圖的對比過程。同時Curator可以自動爲開發人員處理反覆註冊監聽,從而大大簡化了原生API開發的繁瑣過程數據庫
private static final String PATH_CACHE = "/example/pathCache";
private static final String NODE_CACHE = "/example/nodeCache";
private static final String TREE_CACHE = "/example/treeCache";
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
//PathChildrenCache
System.out.println("=========================PathChildrenCache==========================");
PathChildrenCache pCache = new PathChildrenCache(client, PATH_CACHE, true);
pCache.start();
pCache.getListenable().addListener((c, e) -> {
System.out.println("事件類型:" + e.getType());
if (null != e.getData()) {
System.out.println("節點數據:" + e.getData().getPath() + " = " + new String(e.getData().getData()));
}
});
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
Thread.sleep(100);
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
Thread.sleep(100);
client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
Thread.sleep(100);
for (ChildData data : pCache.getCurrentData()) {
System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
}
client.delete().forPath("/example/pathCache/test01");
Thread.sleep(100);
client.delete().forPath("/example/pathCache/test02");
Thread.sleep(2000);
pCache.close();
//Node Cache
System.out.println("=========================NodeCache==========================");
client.create().creatingParentsIfNeeded().forPath(NODE_CACHE);
NodeCache nCache = new NodeCache(client, NODE_CACHE);
nCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData data = nCache.getCurrentData();
if (null != data) {
System.out.println("節點數據:" + new String(nCache.getCurrentData().getData()));
} else {
System.out.println("節點被刪除!");
}
}
});
nCache.start();
client.setData().forPath(NODE_CACHE, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(NODE_CACHE, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(NODE_CACHE);
Thread.sleep(2000);
nCache.close();
//Tree cache
System.out.println("=========================TreeCache==========================");
client.create().creatingParentsIfNeeded().forPath(TREE_CACHE);
TreeCache cache = new TreeCache(client, TREE_CACHE);
cache.getListenable().addListener((c, e) ->
System.out.println("事件類型:" + e.getType() + " | 路徑:" + (null != e.getData() ? e.getData().getPath() : null)));
cache.start();
client.setData().forPath(TREE_CACHE, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(TREE_CACHE, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(TREE_CACHE);
Thread.sleep(1000 * 2);
cache.close();
client.close();
}
複製代碼
=========================PathChildrenCache==========================
事件類型:CONNECTION_RECONNECTED
事件類型:CHILD_ADDED
節點數據:/example/pathCache/test01 = 01
事件類型:CHILD_ADDED
節點數據:/example/pathCache/test02 = 02
事件類型:CHILD_UPDATED
節點數據:/example/pathCache/test01 = 01_V2
getCurrentData:/example/pathCache/test01 = 01_V2
getCurrentData:/example/pathCache/test02 = 02
事件類型:CHILD_REMOVED
節點數據:/example/pathCache/test01 = 01_V2
事件類型:CHILD_REMOVED
節點數據:/example/pathCache/test02 = 02
=========================NodeCache==========================
節點數據:01
節點數據:02
節點被刪除!
=========================TreeCache==========================
事件類型:NODE_ADDED | 路徑:/example/treeCache
事件類型:INITIALIZED | 路徑:null
事件類型:NODE_UPDATED | 路徑:/example/treeCache
事件類型:NODE_UPDATED | 路徑:/example/treeCache
事件類型:NODE_REMOVED | 路徑:/example/treeCache
複製代碼
zookeeper選舉機制api
流程示意圖緩存
流程解析bash
選舉完成後會將以上信息發給集羣中的每一個節點,默認是採用投票數大於半數則勝出的邏輯,因此zookeeper集羣的節點數通常都是單數服務器
zookeeper 分佈式鎖應用session
zookeeper特性
分佈式鎖原理
基於Curator分佈式鎖代碼展現
//建立客戶端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
//建立分佈式鎖, 鎖空間的根節點路徑爲/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
mutex.acquire();
//得到了鎖, 進行業務流程
//todo
//完成業務流程, 釋放鎖
mutex.release();
//關閉客戶端
client.close();
複製代碼