ZooKeeper 之快速入門 破鏡重圓,堅持不懈!

-----------------破鏡重圓,堅持不懈!html


 

1. 概述

Zookeeper是Hadoop的一個子項目,它是分佈式系統中的協調系統,可提供的服務主要有:配置服務、名字服務、分佈式同步、組服務等。java

它有以下的一些特色:node

  • 簡單

Zookeeper的核心是一個精簡的文件系統,它支持一些簡單的操做和一些抽象操做,例如,排序和通知。apache

  • 豐富

         Zookeeper的原語操做是很豐富的,可實現一些協調數據結構和協議。例如,分佈式隊列、分佈式鎖和一組同級別節點中的「領導者選舉」。api

  • 高可靠

Zookeeper支持集羣模式,能夠很容易的解決單點故障問題。tomcat

  • 鬆耦合交互

不一樣進程間的交互不須要了解彼此,甚至能夠沒必要同時存在,某進程在zookeeper中留下消息後,該進程結束後其它進程還能夠讀這條消息。服務器

  • 資源庫

         Zookeeper實現了一個關於通用協調模式的開源共享存儲庫,能使開發者免於編寫這類通用協議。數據結構

 

2. ZooKeeper的安裝

  • 獨立模式安裝

Zookeeper的運行環境是須要java的,建議安裝oracle的java6.oracle

可去官網下載一個穩定的版本,而後進行安裝:http://zookeeper.apache.org/異步

解壓後在zookeeper的conf目錄下建立配置文件zoo.cfg,裏面的配置信息可參考統計目錄下的zoo_sample.cfg文件,咱們這裏配置爲:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper-data/
clientPort=2181

tickTime指定了ZooKeeper的基本時間單位(以毫秒爲單位);

initLimit指定了啓動zookeeper時,zookeeper實例中的隨從實例同步到領導實例的初始化鏈接時間限制,超出時間限制則鏈接失敗(以tickTime爲時間單位);

syncLimit指定了zookeeper正常運行時,主從節點之間同步數據的時間限制,若超過這個時間限制,那麼隨從實例將會被丟棄;

dataDirzookeeper存放數據的目錄;

clientPort用於鏈接客戶端的端口。

  • 啓動一個本地的ZooKeeper實例
% zkServer.sh start

檢查ZooKeeper是否正在運行

echo ruok | nc localhost 2181

如果正常運行的話會打印「imok」。

3. ZooKeeper監控

  • 遠程JMX配置

默認狀況下,zookeeper是支持本地的jmx監控的。若須要遠程監控zookeeper,則須要進行進行以下配置。

默認的配置有這麼一行:

ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"

我們在$JMXLOCALONLY後邊添加jmx的相關參數配置:

複製代碼
ZOOMAIN="-Dcom.sun.management.jmxremote
        -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY
                -Djava.rmi.server.hostname=192.168.1.8
                -Dcom.sun.management.jmxremote.port=1911
                -Dcom.sun.management.jmxremote.ssl=false
                -Dcom.sun.management.jmxremote.authenticate=false
                 org.apache.zookeeper.server.quorum.QuorumPeerMain"
複製代碼

這樣就能夠遠程監控了,能夠用jconsole.exe或jvisualvm.exe等工具對其進行監控。

  • 身份驗證

這裏沒有配置驗證信息,若是須要請參見個人博文jvisualvm遠程監控tomcat:http://www.cnblogs.com/leocook/p/jvisualvmandtomcat.html

4. Zookeeper的存儲模型

Zookeeper的數據存儲採用的是結構化存儲,結構化存儲是沒有文件和目錄的概念,裏邊的目錄和文件被抽象成了節點(node),zookeeper裏能夠稱爲znode。Znode的層次結構以下圖:

最上邊的是根目錄,下邊分別是不一樣級別的子目錄。

5. Zookeeper客戶端的使用

  • zkCli.sh

可以使用./zkCli.sh -server localhost來鏈接到Zookeeper服務上。

使用ls /可查看根節點下有哪些子節點,能夠雙擊Tab鍵查看更多命令。

  • Java客戶端

可建立org.apache.zookeeper.ZooKeeper對象來做爲zk的客戶端,注意,java api裏建立zk客戶端是異步的,爲防止在客戶端還未完成建立就被使用的狀況,這裏可使用同步計時器,確保zk對象建立完成再被使用。

  • C客戶端

可使用zhandle_t指針來表示zk客戶端,可用zookeeper_init方法來建立。可在ZK_HOME\src\c\src\ cli.c查看部分示例代碼。

6. Zookeeper建立Znode

Znode有兩種類型:短暫的和持久的。短暫的znode在建立的客戶端與服務器端斷開(不管是明確的斷開仍是故障斷開)鏈接時,該znode都會被刪除;相反,持久的znode則不會。

複製代碼
public class CreateGroup implements Watcher{
    private static final int SESSION_TIMEOUT = 1000;//會話延時

    private ZooKeeper zk = null;
    private CountDownLatch countDownLatch = new CountDownLatch(1);//同步計數器

    public void process(WatchedEvent event) {
        if(event.getState() == KeeperState.SyncConnected){
            countDownLatch.countDown();//計數器減一
        }
    }

    /**
     * 建立zk對象
     * 當客戶端鏈接上zookeeper時會執行process(event)裏的countDownLatch.countDown(),計數器的值變爲0,則countDownLatch.await()方法返回。
     * @param hosts
     * @throws IOException
     * @throws InterruptedException
     */
    public void connect(String hosts) throws IOException, InterruptedException {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        countDownLatch.await();//阻塞程序繼續執行
    }
    
    /**
     * 建立group
     * 
     * @param groupName 組名
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void create(String groupName) throws KeeperException, InterruptedException {
        String path = "/" + groupName;
        String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE/*容許任何客戶端對該znode進行讀寫*/, CreateMode.PERSISTENT/*持久化的znode*/);
        System.out.println("Created " + createPath);
    }
    
    /**
     * 關閉zk
     * @throws InterruptedException
     */
    public void close() throws InterruptedException {
        if(zk != null){
            try {
                zk.close();
            } catch (InterruptedException e) {
                throw e;
            }finally{
                zk = null;
                System.gc();
            }
        }
    }
}
複製代碼

這裏咱們使用了同步計數器CountDownLatch,在connect方法中建立執行了zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);以後,下邊接着調用了CountDownLatch對象的await方法阻塞,由於這是zk客戶端不必定已經完成了與服務端的鏈接,在客戶端鏈接到服務端時會觸發觀察者調用process()方法,咱們在方法裏邊判斷一下觸發事件的類型,完成鏈接後計數器減一,connect方法中解除阻塞。

還有兩個地方須要注意:這裏建立的znode的訪問權限是open的,且該znode是持久化存儲的。

測試類以下:

複製代碼
public class CreateGroupTest {
    private static String hosts = "192.168.1.8";
    private static String groupName = "zoo";
    
    private CreateGroup createGroup = null;
    
    /**
     * init
     * @throws InterruptedException 
     * @throws KeeperException 
     * @throws IOException 
     */
    @Before
    public void init() throws KeeperException, InterruptedException, IOException {
        createGroup = new CreateGroup();
        createGroup.connect(hosts);
    }
    
    @Test
    public void testCreateGroup() throws KeeperException, InterruptedException {
        createGroup.create(groupName);
    }
    
    /**
     * 銷燬資源
     */
    @After
    public void destroy() {
        try {
            createGroup.close();
            createGroup = null;
            System.gc();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

因爲zk對象的建立和銷燬代碼是能夠複用的,因此這裏咱們把它分裝成了接口:

複製代碼
/**
 * 鏈接的觀察者,封裝了zk的建立等
 * @author leo
 *
 */
public class ConnectionWatcher implements Watcher {
    private static final int SESSION_TIMEOUT = 5000;

    protected ZooKeeper zk = null;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void process(WatchedEvent event) {
        KeeperState state = event.getState();
        
        if(state == KeeperState.SyncConnected){
            countDownLatch.countDown();
        }
    }
    
    /**
     * 鏈接資源
     * @param hosts
     * @throws IOException
     * @throws InterruptedException
     */
    public void connection(String hosts) throws IOException, InterruptedException {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        countDownLatch.await();
    }
    
    /**
     * 釋放資源
     * @throws InterruptedException
     */
    public void close() throws InterruptedException {
        if (null != zk) {
            try {
                zk.close();
            } catch (InterruptedException e) {
                throw e;
            }finally{
                zk = null;
                System.gc();
            }
        }
    }
}
複製代碼

7. Zookeeper刪除Znode

複製代碼
/**
 * 刪除分組
 * @author leo
 *
 */
public class DeleteGroup extends ConnectionWatcher {
    public void delete(String groupName) {
        String path = "/" + groupName;
        
        try {
            List<String> children = zk.getChildren(path, false);
            
            for(String child : children){
                zk.delete(path + "/" + child, -1);
            }
            zk.delete(path, -1);//版本號爲-1,
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

zk.delete(path,version)方法的第二個參數是znode版本號,若是提供的版本號和znode版本號一致纔會刪除這個znode,這樣能夠檢測出對znode的修改衝突。經過將版本號設置爲-1,能夠繞過這個版本檢測機制,不管znode的版本號是什麼,都會直接將其刪除。

測試類:

複製代碼
public class DeleteGroupTest {
    private static final String HOSTS = "192.168.1.137";
    private static final String groupName = "zoo";
    
    private DeleteGroup deleteGroup = null;
    
    @Before
    public void init() throws IOException, InterruptedException {
        deleteGroup = new DeleteGroup();
        deleteGroup.connection(HOSTS);
    }
    
    @Test
    public void testDelete() throws IOException, InterruptedException, KeeperException {
        deleteGroup.delete(groupName);
    }
    
    @After
    public void destroy() throws InterruptedException {
        if(null != deleteGroup){
            try {
                deleteGroup.close();
            } catch (InterruptedException e) {
                throw e;
            }finally{
                deleteGroup = null;
                System.gc();
            }
        }
    }
}
複製代碼

8. Zookeeper的相關操做

ZooKeeper中共有9中操做:

create:建立一個znode

delete:刪除一個znode

exists:測試一個znode

getACL,setACL:獲取/設置一個znode的ACL(權限控制)

getChildren:獲取一個znode的子節點

getData,setData:獲取/設置一個znode所保存的數據

sync:將客戶端的znode視圖與ZooKeeper同步

這裏更新數據是必需要提供znode的版本號(也可使用-1強制更新,這裏能夠執行前經過exists方法拿到znode的元數據Stat對象,而後從Stat對象中拿到對應的版本號信息),若是版本號不匹配,則更新會失敗。所以一個更新失敗的客戶端能夠嘗試是否重試或執行其它操做。

9. ZooKeeper的API

ZooKeeper的api支持多種語言,在操做時能夠選擇使用同步api仍是異步api。同步api通常是直接返回結果,異步api通常是經過回調來傳送執行結果的,通常方法中有某參數是類AsyncCallback的內部接口,那麼該方法應該就是異步調用,回調方法名爲processResult。

10. 觀察觸發器

能夠對客戶端和服務器端之間的鏈接設置觀察觸發器(後邊稱之爲zookeeper的狀態觀察觸發器),也能夠對znode設置觀察觸發器。

  • 狀態觀察器

zk的整個生命週期以下:

可在建立zk對象時傳入一個觀察器,在完成CONNECTING狀態到CONNECTED狀態時,觀察器會觸發一個事件,該觸發的事件類型爲NONE,經過event.getState()方法拿到事件狀態爲SyncConnected。有一點須要注意的就是,在zk調用close方法時不會觸發任何事件,由於這類的顯示調用是開發者主動執行的,屬於可控的,不用使用事件通知來告知程序。這一塊在下篇博文還會詳細解說。

  • 設置znode的觀察器

能夠在讀操做exists、getChildren和getData上設置觀察,在執行寫操做create、delete和setData將會觸發觀察事件,固然,在執行寫的操做時,也能夠選擇是否觸發znode上設置的觀察器,具體可查看相關的api。

當觀察的znode被建立、刪除或其數據被更新時,設置在exists上的觀察將會被觸發;

當觀察的znode被刪除或數據被更新時,設置在getData上的觀察將會被觸發;

當觀察的znode的子節點被建立、刪除或znode自身被刪除時,設置在getChildren上的觀察將會被觸發,可經過觀察事件的類型來判斷被刪除的是znode仍是它的子節點。

對於NodeCreatedNodeDeleted根據路徑就能發現是哪一個znode被寫;對於NodeChildrenChanged可根據getChildren來獲取新的子節點列表。

注意:在收到收到觸發事件到執行讀操做之間,znode的狀態可能會發生狀態,這點須要牢記。

至此,編寫簡單的zookeeper應該是能夠的了,下篇博文我們來深刻探討zookeeper的相關知識

相關文章
相關標籤/搜索