-----------------破鏡重圓,堅持不懈!html
Zookeeper是Hadoop的一個子項目,它是分佈式系統中的協調系統,可提供的服務主要有:配置服務、名字服務、分佈式同步、組服務等。java
它有以下的一些特色:node
Zookeeper的核心是一個精簡的文件系統,它支持一些簡單的操做和一些抽象操做,例如,排序和通知。apache
Zookeeper的原語操做是很豐富的,可實現一些協調數據結構和協議。例如,分佈式隊列、分佈式鎖和一組同級別節點中的「領導者選舉」。api
Zookeeper支持集羣模式,能夠很容易的解決單點故障問題。tomcat
不一樣進程間的交互不須要了解彼此,甚至能夠沒必要同時存在,某進程在zookeeper中留下消息後,該進程結束後其它進程還能夠讀這條消息。服務器
Zookeeper實現了一個關於通用協調模式的開源共享存儲庫,能使開發者免於編寫這類通用協議。數據結構
Zookeeper的運行環境是須要java的,建議安裝oracle的java6.oracle
可去官網下載一個穩定的版本,而後進行安裝:http://zookeeper.apache.org/異步
解壓後在zookeeper的conf目錄下建立配置文件zoo.cfg,裏面的配置信息可參考統計目錄下的zoo_sample.cfg文件,咱們這裏配置爲:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper-data/ clientPort=2181
tickTime:指定了ZooKeeper的基本時間單位(以毫秒爲單位);
initLimit:指定了啓動zookeeper時,zookeeper實例中的隨從實例同步到領導實例的初始化鏈接時間限制,超出時間限制則鏈接失敗(以tickTime爲時間單位);
syncLimit:指定了zookeeper正常運行時,主從節點之間同步數據的時間限制,若超過這個時間限制,那麼隨從實例將會被丟棄;
dataDir:zookeeper存放數據的目錄;
clientPort:用於鏈接客戶端的端口。
% zkServer.sh start
檢查ZooKeeper是否正在運行
echo ruok | nc localhost 2181
如果正常運行的話會打印「imok」。
默認狀況下,zookeeper是支持本地的jmx監控的。若須要遠程監控zookeeper,則須要進行進行以下配置。
默認的配置有這麼一行:
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
我們在$JMXLOCALONLY後邊添加jmx的相關參數配置:
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY -Djava.rmi.server.hostname=192.168.1.8 -Dcom.sun.management.jmxremote.port=1911 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false org.apache.zookeeper.server.quorum.QuorumPeerMain"
這樣就能夠遠程監控了,能夠用jconsole.exe或jvisualvm.exe等工具對其進行監控。
這裏沒有配置驗證信息,若是須要請參見個人博文jvisualvm遠程監控tomcat:http://www.cnblogs.com/leocook/p/jvisualvmandtomcat.html
Zookeeper的數據存儲採用的是結構化存儲,結構化存儲是沒有文件和目錄的概念,裏邊的目錄和文件被抽象成了節點(node),zookeeper裏能夠稱爲znode。Znode的層次結構以下圖:
最上邊的是根目錄,下邊分別是不一樣級別的子目錄。
可以使用./zkCli.sh -server localhost來鏈接到Zookeeper服務上。
使用ls /可查看根節點下有哪些子節點,能夠雙擊Tab鍵查看更多命令。
可建立org.apache.zookeeper.ZooKeeper對象來做爲zk的客戶端,注意,java api裏建立zk客戶端是異步的,爲防止在客戶端還未完成建立就被使用的狀況,這裏可使用同步計時器,確保zk對象建立完成再被使用。
可使用zhandle_t指針來表示zk客戶端,可用zookeeper_init方法來建立。可在ZK_HOME\src\c\src\ cli.c查看部分示例代碼。
Znode有兩種類型:短暫的和持久的。短暫的znode在建立的客戶端與服務器端斷開(不管是明確的斷開仍是故障斷開)鏈接時,該znode都會被刪除;相反,持久的znode則不會。
public class CreateGroup implements Watcher{ private static final int SESSION_TIMEOUT = 1000;//會話延時 private ZooKeeper zk = null; private CountDownLatch countDownLatch = new CountDownLatch(1);//同步計數器 public void process(WatchedEvent event) { if(event.getState() == KeeperState.SyncConnected){ countDownLatch.countDown();//計數器減一 } } /** * 建立zk對象 * 當客戶端鏈接上zookeeper時會執行process(event)裏的countDownLatch.countDown(),計數器的值變爲0,則countDownLatch.await()方法返回。 * @param hosts * @throws IOException * @throws InterruptedException */ public void connect(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); countDownLatch.await();//阻塞程序繼續執行 } /** * 建立group * * @param groupName 組名 * @throws KeeperException * @throws InterruptedException */ public void create(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE/*容許任何客戶端對該znode進行讀寫*/, CreateMode.PERSISTENT/*持久化的znode*/); System.out.println("Created " + createPath); } /** * 關閉zk * @throws InterruptedException */ public void close() throws InterruptedException { if(zk != null){ try { zk.close(); } catch (InterruptedException e) { throw e; }finally{ zk = null; System.gc(); } } } }
這裏咱們使用了同步計數器CountDownLatch,在connect方法中建立執行了zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);以後,下邊接着調用了CountDownLatch對象的await方法阻塞,由於這是zk客戶端不必定已經完成了與服務端的鏈接,在客戶端鏈接到服務端時會觸發觀察者調用process()方法,咱們在方法裏邊判斷一下觸發事件的類型,完成鏈接後計數器減一,connect方法中解除阻塞。
還有兩個地方須要注意:這裏建立的znode的訪問權限是open的,且該znode是持久化存儲的。
測試類以下:
public class CreateGroupTest { private static String hosts = "192.168.1.8"; private static String groupName = "zoo"; private CreateGroup createGroup = null; /** * init * @throws InterruptedException * @throws KeeperException * @throws IOException */ @Before public void init() throws KeeperException, InterruptedException, IOException { createGroup = new CreateGroup(); createGroup.connect(hosts); } @Test public void testCreateGroup() throws KeeperException, InterruptedException { createGroup.create(groupName); } /** * 銷燬資源 */ @After public void destroy() { try { createGroup.close(); createGroup = null; System.gc(); } catch (InterruptedException e) { e.printStackTrace(); } } }
因爲zk對象的建立和銷燬代碼是能夠複用的,因此這裏咱們把它分裝成了接口:
/** * 鏈接的觀察者,封裝了zk的建立等 * @author leo * */ public class ConnectionWatcher implements Watcher { private static final int SESSION_TIMEOUT = 5000; protected ZooKeeper zk = null; private CountDownLatch countDownLatch = new CountDownLatch(1); public void process(WatchedEvent event) { KeeperState state = event.getState(); if(state == KeeperState.SyncConnected){ countDownLatch.countDown(); } } /** * 鏈接資源 * @param hosts * @throws IOException * @throws InterruptedException */ public void connection(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); countDownLatch.await(); } /** * 釋放資源 * @throws InterruptedException */ public void close() throws InterruptedException { if (null != zk) { try { zk.close(); } catch (InterruptedException e) { throw e; }finally{ zk = null; System.gc(); } } } }
/** * 刪除分組 * @author leo * */ public class DeleteGroup extends ConnectionWatcher { public void delete(String groupName) { String path = "/" + groupName; try { List<String> children = zk.getChildren(path, false); for(String child : children){ zk.delete(path + "/" + child, -1); } zk.delete(path, -1);//版本號爲-1, } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
zk.delete(path,version)方法的第二個參數是znode版本號,若是提供的版本號和znode版本號一致纔會刪除這個znode,這樣能夠檢測出對znode的修改衝突。經過將版本號設置爲-1,能夠繞過這個版本檢測機制,不管znode的版本號是什麼,都會直接將其刪除。
測試類:
public class DeleteGroupTest { private static final String HOSTS = "192.168.1.137"; private static final String groupName = "zoo"; private DeleteGroup deleteGroup = null; @Before public void init() throws IOException, InterruptedException { deleteGroup = new DeleteGroup(); deleteGroup.connection(HOSTS); } @Test public void testDelete() throws IOException, InterruptedException, KeeperException { deleteGroup.delete(groupName); } @After public void destroy() throws InterruptedException { if(null != deleteGroup){ try { deleteGroup.close(); } catch (InterruptedException e) { throw e; }finally{ deleteGroup = null; System.gc(); } } } }
ZooKeeper中共有9中操做:
create:建立一個znode
delete:刪除一個znode
exists:測試一個znode
getACL,setACL:獲取/設置一個znode的ACL(權限控制)
getChildren:獲取一個znode的子節點
getData,setData:獲取/設置一個znode所保存的數據
sync:將客戶端的znode視圖與ZooKeeper同步
這裏更新數據是必需要提供znode的版本號(也可使用-1強制更新,這裏能夠執行前經過exists方法拿到znode的元數據Stat對象,而後從Stat對象中拿到對應的版本號信息),若是版本號不匹配,則更新會失敗。所以一個更新失敗的客戶端能夠嘗試是否重試或執行其它操做。
ZooKeeper的api支持多種語言,在操做時能夠選擇使用同步api仍是異步api。同步api通常是直接返回結果,異步api通常是經過回調來傳送執行結果的,通常方法中有某參數是類AsyncCallback的內部接口,那麼該方法應該就是異步調用,回調方法名爲processResult。
能夠對客戶端和服務器端之間的鏈接設置觀察觸發器(後邊稱之爲zookeeper的狀態觀察觸發器),也能夠對znode設置觀察觸發器。
zk的整個生命週期以下:
可在建立zk對象時傳入一個觀察器,在完成CONNECTING狀態到CONNECTED狀態時,觀察器會觸發一個事件,該觸發的事件類型爲NONE,經過event.getState()方法拿到事件狀態爲SyncConnected。有一點須要注意的就是,在zk調用close方法時不會觸發任何事件,由於這類的顯示調用是開發者主動執行的,屬於可控的,不用使用事件通知來告知程序。這一塊在下篇博文還會詳細解說。
能夠在讀操做exists、getChildren和getData上設置觀察,在執行寫操做create、delete和setData將會觸發觀察事件,固然,在執行寫的操做時,也能夠選擇是否觸發znode上設置的觀察器,具體可查看相關的api。
當觀察的znode被建立、刪除或其數據被更新時,設置在exists上的觀察將會被觸發;
當觀察的znode被刪除或數據被更新時,設置在getData上的觀察將會被觸發;
當觀察的znode的子節點被建立、刪除或znode自身被刪除時,設置在getChildren上的觀察將會被觸發,可經過觀察事件的類型來判斷被刪除的是znode仍是它的子節點。
對於NodeCreated和NodeDeleted根據路徑就能發現是哪一個znode被寫;對於NodeChildrenChanged可根據getChildren來獲取新的子節點列表。
注意:在收到收到觸發事件到執行讀操做之間,znode的狀態可能會發生狀態,這點須要牢記。
至此,編寫簡單的zookeeper應該是能夠的了,下篇博文我們來深刻探討zookeeper的相關知識