zookeeper機制

 

1.  Zookeeper概念簡介:

Zookeeper是一個分佈式協調服務;就是爲用戶的分佈式應用程序提供協調服務java

A、zookeeper是爲別的分佈式程序服務的node

B、Zookeeper自己就是一個分佈式程序(只要有半數以上節點存活,zk就能正常服務)apache

C、Zookeeper所提供的服務涵蓋:主從協調、服務器節點動態上下線、統一配置管理、分佈式共享鎖、統一名稱服務……api

D、雖說能夠提供各類服務,可是zookeeper在底層其實只提供了兩個功能:服務器

管理(存儲,讀取)用戶程序提交的數據;數據結構

併爲用戶程序提供數據節點監聽服務;app

 

Zookeeper經常使用應用場景:dom

《見圖》分佈式

 

Zookeeper集羣的角色:  Leader 和  follower  (Observer)工具

只要集羣中有半數以上節點存活,集羣就能提供服務

 

2.  zookeeper集羣機制

半數機制:集羣中半數以上機器存活,集羣可用。

zookeeper適合裝在奇數臺機器上!!!

3.  安裝

3.1. 安裝

3.1.1.   機器部署

安裝到3臺虛擬機上

安裝好JDK

 

 

3.1.2.   上傳

上傳用工具。

3.1.3.   解壓

su – hadoop(切換到hadoop用戶)

tar -zxvf zookeeper-3.4.5.tar.gz(解壓)

3.1.4.   重命名

mv zookeeper-3.4.5 zookeeper(重命名文件夾zookeeper-3.4.5爲zookeeper)

3.1.5.   修改環境變量

一、su – root(切換用戶到root)

二、vi /etc/profile(修改文件)

三、添加內容:

export ZOOKEEPER_HOME=/home/hadoop/zookeeper

export PATH=$PATH:$ZOOKEEPER_HOME/bin

四、從新編譯文件:

source /etc/profile

五、注意:3臺zookeeper都須要修改

六、修改完成後切換回hadoop用戶:

su - hadoop

3.1.6.   修改配置文件

一、用hadoop用戶操做

cd zookeeper/conf

cp zoo_sample.cfg zoo.cfg

二、vi zoo.cfg

三、添加內容:

dataDir=/home/hadoop/zookeeper/data

dataLogDir=/home/hadoop/zookeeper/log

server.1=slave1:2888:3888 (主機名, 心跳端口、數據端口)

server.2=slave2:2888:3888

server.3=slave3:2888:3888

四、建立文件夾:

cd /home/hadoop/zookeeper/

mkdir -m 755 data

mkdir -m 755 log

五、在data文件夾下新建myid文件,myid的文件內容爲:

cd data

vi myid

添加內容:

1

 

3.1.7.   將集羣下發到其餘機器上

scp -r /home/hadoop/zookeeper hadoop@slave2:/home/hadoop/

scp -r /home/hadoop/zookeeper hadoop@slave3:/home/hadoop/

3.1.8.   修改其餘機器的配置文件

到slave2上:修改myid爲:2

到slave3上:修改myid爲:3

3.1.9.   啓動(每臺機器)

zkServer.sh start

3.1.10.          查看集羣狀態

一、  jps(查看進程)

二、  zkServer.sh status(查看集羣狀態,主從信息)

4.  zookeeper結構和命令

4.1. zookeeper特性

一、Zookeeper:一個leader,多個follower組成的集羣

二、全局數據一致:每一個server保存一份相同的數據副本,client不管鏈接到哪一個server,數據都是一致的

三、分佈式讀寫,更新請求轉發,由leader實施

四、更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行

五、數據更新原子性,一次數據更新要麼成功,要麼失敗

六、實時性,在必定時間範圍內,client能讀到最新數據

 

4.2. zookeeper數據結構

一、層次化的目錄結構,命名符合常規文件系統規範(見下圖)

二、每一個節點在zookeeper中叫作znode,而且其有一個惟一的路徑標識

三、節點Znode能夠包含數據和子節點(可是EPHEMERAL類型的節點不能有子節點,下一頁詳細講解)

四、客戶端應用能夠在節點上設置監視器(後續詳細講解)           

4.3. 數據結構的圖

 

 

 

 

4.4. 節點類型

一、Znode有兩種類型:

短暫(ephemeral)(斷開鏈接本身刪除)

持久(persistent)(斷開鏈接不刪除)

二、Znode有四種形式的目錄節點(默認是persistent )

PERSISTENT

PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )

EPHEMERAL

EPHEMERAL_SEQUENTIAL

三、建立znode時設置順序標識,znode名稱後會附加一個值,順序號是一個單調遞增的計數器,由父節點維護

四、在分佈式系統中,順序號能夠被用於爲全部的事件進行全局排序,這樣客戶端能夠經過順序號推斷事件的順序

4.5. zookeeper命令行操做

運行 zkCli.sh –server <ip>進入命令行工具

 

一、使用 ls 命令來查看當前 ZooKeeper 中所包含的內容:

[zk: 202.115.36.251:2181(CONNECTED) 1] ls /

二、建立一個新的 znode ,使用 create /zk myData 。這個命令建立了一個新的 znode 節點「 zk 」以及與它關聯的字符串:

[zk: 202.115.36.251:2181(CONNECTED) 2] create /zk "myData「

三、咱們運行 get 命令來確認 znode 是否包含咱們所建立的字符串:

[zk: 202.115.36.251:2181(CONNECTED) 3] get /zk

#監聽這個節點的變化,當另一個客戶端改變/zk時,它會打出下面的

#WATCHER::

#WatchedEvent state:SyncConnected type:NodeDataChanged path:/zk

[zk: localhost:2181(CONNECTED) 4] get /zk watch

四、下面咱們經過 set 命令來對 zk 所關聯的字符串進行設置:

[zk: 202.115.36.251:2181(CONNECTED) 4] set /zk "zsl「

五、下面咱們將剛纔建立的 znode 刪除:

[zk: 202.115.36.251:2181(CONNECTED) 5] delete /zk

六、刪除節點:rmr

[zk: 202.115.36.251:2181(CONNECTED) 5] rmr /zk

 

 

 

 

 

 

4.6.  zookeeper-api應用

4.6.1.   基本使用

 org.apache.zookeeper.Zookeeper是客戶端入口主類,負責創建與server的會話

它提供了表 1 所示幾類主要方法  :

功能

描述

create

在本地目錄樹中建立一個節點

delete

刪除一個節點

exists

測試本地是否存在目標節點

get/set data

從目標節點上讀取 / 寫數據

get/set ACL

獲取 / 設置目標節點訪問控制列表信息

get children

檢索一個子節點上的列表

sync

等待要被傳送的數據

 

 

 

表 1 : ZooKeeper API 描述

 

 

4.6.2.   demo增刪改查

public class SimpleDemo {

    // 會話超時時間,設置爲與系統默認時間一致

    private static final int SESSION_TIMEOUT = 30000;

    // 建立 ZooKeeper 實例

    ZooKeeper zk;

    // 建立 Watcher 實例

    Watcher wh = new Watcher() {

        public void process(org.apache.zookeeper.WatchedEvent event)

        {

             System.out.println(event.toString());

        }

    };

    // 初始化 ZooKeeper 實例

    private void createZKInstance() throws IOException

    {

        zk = new ZooKeeper("weekend01:2181", SimpleDemo.SESSION_TIMEOUT, this.wh);

    }

    private void ZKOperations() throws IOException, InterruptedException, KeeperException

    {

        System.out.println("/n1. 建立 ZooKeeper 節點 (znode : zoo2, 數據: myData2 ,權限: OPEN_ACL_UNSAFE ,節點類型: Persistent");

        zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        System.out.println("/n2. 查看是否建立成功: ");

        System.out.println(new String(zk.getData("/zoo2", false, null)));

        System.out.println("/n3. 修改節點數據 ");

        zk.setData("/zoo2", "shenlan211314".getBytes(), -1);

        System.out.println("/n4. 查看是否修改爲功: ");

        System.out.println(new String(zk.getData("/zoo2", false, null)));

        System.out.println("/n5. 刪除節點 ");

        zk.delete("/zoo2", -1);

        System.out.println("/n6. 查看節點是否被刪除: ");

        System.out.println(" 節點狀態: [" + zk.exists("/zoo2", false) + "]");

    }

    private void ZKClose() throws InterruptedException

    {

        zk.close();

    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        SimpleDemo dm = new SimpleDemo();

        dm.createZKInstance();

        dm.ZKOperations();

        dm.ZKClose();

    }

}

 

Zookeeper的監聽器工做機制

 

 

監聽器是一個接口,咱們的代碼中能夠實現Wather這個接口,實現其中的process方法,方法中即咱們本身的業務邏輯

 

監聽器的註冊是在獲取數據的操做中實現:

getData(path,watch?)監聽的事件是:節點數據變化事件

getChildren(path,watch?)監聽的事件是:節點下的子節點增減變化事件

 

 

 

4.7. zookeeper應用案例(分佈式應用HA||分佈式鎖)

3.7.1 實現分佈式應用的(主節點HA)及客戶端動態更新主節點狀態

某分佈式系統中,主節點能夠有多臺,能夠動態上下線

任意一臺客戶端都能實時感知到主節點服務器的上下線

 

 

 

 

 

 

 

 

 

 

 

 

A、客戶端實現

public class AppClient {

    private String groupNode = "sgroup";

    private ZooKeeper zk;

    private Stat stat = new Stat();

    private volatile List<String> serverList;



    /**

     * 鏈接zookeeper

     */

    public void connectZookeeper() throws Exception {

        zk

= new ZooKeeper("localhost:4180,localhost:4181,localhost:4182", 5000, new Watcher() {

             public void process(WatchedEvent event) {

                 // 若是發生了"/sgroup"節點下的子節點變化事件, 更新server列表, 並從新註冊監聽

                 if (event.getType() == EventType.NodeChildrenChanged

                     && ("/" + groupNode).equals(event.getPath())) {

                     try {

                         updateServerList();

                     } catch (Exception e) {

                         e.printStackTrace();

                     }

                 }

             }

        });



        updateServerList();

    }



    /**

     * 更新server列表

     */

    private void updateServerList() throws Exception {

        List<String> newServerList = new ArrayList<String>();



        // 獲取並監聽groupNode的子節點變化

        // watch參數爲true, 表示監聽子節點變化事件.

        // 每次都須要從新註冊監聽, 由於一次註冊, 只能監聽一次事件, 若是還想繼續保持監聽, 必須從新註冊

        List<String> subList = zk.getChildren("/" + groupNode, true);

        for (String subNode : subList) {

             // 獲取每一個子節點下關聯的server地址

             byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);

             newServerList.add(new String(data, "utf-8"));

        }



        // 替換server列表

        serverList = newServerList;



        System.out.println("server list updated: " + serverList);

    }



    /**

     * client的工做邏輯寫在這個方法中

     * 此處不作任何處理, 只讓client sleep

     */

    public void handle() throws InterruptedException {

        Thread.sleep(Long.MAX_VALUE);

    }



    public static void main(String[] args) throws Exception {

        AppClient ac = new AppClient();

        ac.connectZookeeper();



        ac.handle();

    }

}

 

B、服務器端實現

public class AppServer {

    private String groupNode = "sgroup";

    private String subNode = "sub";



    /**

     * 鏈接zookeeper

     * @param address server的地址

     */

    public void connectZookeeper(String address) throws Exception {

        ZooKeeper zk = new ZooKeeper(

"localhost:4180,localhost:4181,localhost:4182",

5000, new Watcher() {

             public void process(WatchedEvent event) {

                 // 不作處理

             }

        });

        // 在"/sgroup"下建立子節點

        // 子節點的類型設置爲EPHEMERAL_SEQUENTIAL, 代表這是一個臨時節點, 且在子節點的名稱後面加上一串數字後綴

        // 將server的地址數據關聯到新建立的子節點上

        String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),

             Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("create: " + createdPath);

    }

   

    /**

     * server的工做邏輯寫在這個方法中

     * 此處不作任何處理, 只讓server sleep

     */

    public void handle() throws InterruptedException {

        Thread.sleep(Long.MAX_VALUE);

    }

   

    public static void main(String[] args) throws Exception {

        // 在參數中指定server的地址

        if (args.length == 0) {

             System.err.println("The first argument must be server address");

             System.exit(1);

        }

       

        AppServer as = new AppServer();

        as.connectZookeeper(args[0]);

        as.handle();

    }

}

 

 

3.7.2分佈式共享鎖的簡單實現

ü  客戶端A

public class DistributedClient {

    // 超時時間

    private static final int SESSION_TIMEOUT = 5000;

    // zookeeper server列表

    private String hosts = "localhost:4180,localhost:4181,localhost:4182";

    private String groupNode = "locks";

    private String subNode = "sub";



    private ZooKeeper zk;

    // 當前client建立的子節點

    private String thisPath;

    // 當前client等待的子節點

    private String waitPath;



    private CountDownLatch latch = new CountDownLatch(1);



    /**

     * 鏈接zookeeper

     */

    public void connectZookeeper() throws Exception {

        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {

            public void process(WatchedEvent event) {

                try {

                    // 鏈接創建時, 打開latch, 喚醒wait在該latch上的線程

                    if (event.getState() == KeeperState.SyncConnected) {

                        latch.countDown();

                    }



                    // 發生了waitPath的刪除事件

                    if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {

                        doSomething();

                    }

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        });



        // 等待鏈接創建

        latch.await();



        // 建立子節點

        thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

                CreateMode.EPHEMERAL_SEQUENTIAL);



        // wait一小會, 讓結果更清晰一些

        Thread.sleep(10);



        // 注意, 沒有必要監聽"/locks"的子節點的變化狀況

        List<String> childrenNodes = zk.getChildren("/" + groupNode, false);



        // 列表中只有一個子節點, 那確定就是thisPath, 說明client得到鎖

        if (childrenNodes.size() == 1) {

            doSomething();

        } else {

            String thisNode = thisPath.substring(("/" + groupNode + "/").length());

            // 排序

            Collections.sort(childrenNodes);

            int index = childrenNodes.indexOf(thisNode);

            if (index == -1) {

                // never happened

            } else if (index == 0) {

                // inddx == 0, 說明thisNode在列表中最小, 當前client得到鎖

                doSomething();

            } else {

                // 得到排名比thisPath前1位的節點

                this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);

                // 在waitPath上註冊監聽器, 當waitPath被刪除時, zookeeper會回調監聽器的process方法

                zk.getData(waitPath, true, new Stat());

            }

        }

    }



    private void doSomething() throws Exception {

        try {

            System.out.println("gain lock: " + thisPath);

            Thread.sleep(2000);

            // do something

        } finally {

            System.out.println("finished: " + thisPath);

            // 將thisPath刪除, 監聽thisPath的client將得到通知

            // 至關於釋放鎖

            zk.delete(this.thisPath, -1);

        }

    }



    public static void main(String[] args) throws Exception {

        for (int i = 0; i < 10; i++) {

            new Thread() {

                public void run() {

                    try {

                        DistributedClient dl = new DistributedClient();

                        dl.connectZookeeper();

                    } catch (Exception e) {

                        e.printStackTrace();

                    }

                }

            }.start();

        }



        Thread.sleep(Long.MAX_VALUE);

    }

}

ü  分佈式多進程模式實現:

public class DistributedClientMy {

   



    // 超時時間

    private static final int SESSION_TIMEOUT = 5000;

    // zookeeper server列表

    private String hosts = "spark01:2181,spark02:2181,spark03:2181";

    private String groupNode = "locks";

    private String subNode = "sub";

    private boolean haveLock = false;



    private ZooKeeper zk;

    // 當前client建立的子節點

    private volatile String thisPath;



    /**

     * 鏈接zookeeper

     */

    public void connectZookeeper() throws Exception {

        zk = new ZooKeeper("spark01:2181", SESSION_TIMEOUT, new Watcher() {

             public void process(WatchedEvent event) {

                 try {



                     // 子節點發生變化

                     if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {

                         // thisPath是不是列表中的最小節點

                         List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

                         String thisNode = thisPath.substring(("/" + groupNode + "/").length());

                         // 排序

                         Collections.sort(childrenNodes);

                         if (childrenNodes.indexOf(thisNode) == 0) {

                             doSomething();

                             thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

                                      CreateMode.EPHEMERAL_SEQUENTIAL);

                         }

                     }

                 } catch (Exception e) {

                     e.printStackTrace();

                 }

             }

        });



        // 建立子節點

        thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

                 CreateMode.EPHEMERAL_SEQUENTIAL);



        // wait一小會, 讓結果更清晰一些

        Thread.sleep(new Random().nextInt(1000));



        // 監聽子節點的變化

        List<String> childrenNodes = zk.getChildren("/" + groupNode, true);



        // 列表中只有一個子節點, 那確定就是thisPath, 說明client得到鎖

        if (childrenNodes.size() == 1) {

             doSomething();

             thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

                     CreateMode.EPHEMERAL_SEQUENTIAL);

        }

    }



    /**

     * 共享資源的訪問邏輯寫在這個方法中

     */

    private void doSomething() throws Exception {

        try {

             System.out.println("gain lock: " + thisPath);

             Thread.sleep(2000);

             // do something

        } finally {

             System.out.println("finished: " + thisPath);

             // 將thisPath刪除, 監聽thisPath的client將得到通知

             // 至關於釋放鎖

             zk.delete(this.thisPath, -1);

        }

    }



    public static void main(String[] args) throws Exception {

        DistributedClientMy dl = new DistributedClientMy();

        dl.connectZookeeper();

        Thread.sleep(Long.MAX_VALUE);

    }



   

}


動手練習

 

5.  zookeeper原理

Zookeeper雖然在配置文件中並無指定master和slave

可是,zookeeper工做時,是有一個節點爲leader,其餘則爲follower

Leader是經過內部的選舉機制臨時產生的

 

 

 

5.1. zookeeper的選舉機制(全新集羣paxos)

以一個簡單的例子來講明整個選舉的過程.
假設有五臺服務器組成的zookeeper集羣,它們的id從1-5,同時它們都是最新啓動的,也就是沒有歷史數據,在存放數據量這一點上,都是同樣的.假設這些服務器依序啓動,來看看會發生什麼.
1) 服務器1啓動,此時只有它一臺服務器啓動了,它發出去的報沒有任何響應,因此它的選舉狀態一直是LOOKING狀態
2) 服務器2啓動,它與最開始啓動的服務器1進行通訊,互相交換本身的選舉結果,因爲二者都沒有歷史數據,因此id值較大的服務器2勝出,可是因爲沒有達到超過半數以上的服務器都贊成選舉它(這個例子中的半數以上是3),因此服務器1,2仍是繼續保持LOOKING狀態.
3) 服務器3啓動,根據前面的理論分析,服務器3成爲服務器1,2,3中的老大,而與上面不一樣的是,此時有三臺服務器選舉了它,因此它成爲了此次選舉的leader.
4) 服務器4啓動,根據前面的分析,理論上服務器4應該是服務器1,2,3,4中最大的,可是因爲前面已經有半數以上的服務器選舉了服務器3,因此它只能接收當小弟的命了.
5) 服務器5啓動,同4同樣,當小弟.

5.2. 非全新集羣的選舉機制(數據恢復)

那麼,初始化的時候,是按照上述的說明進行選舉的,可是當zookeeper運行了一段時間以後,有機器down掉,從新選舉時,選舉過程就相對複雜了。

須要加入數據id、leader id和邏輯時鐘。

數據id:數據新的id就大,數據每次更新都會更新id。

Leader id:就是咱們配置的myid中的值,每一個機器一個。

邏輯時鐘:這個值從0開始遞增,每次選舉對應一個值,也就是說:  若是在同一次選舉中,那麼這個值應該是一致的 ;  邏輯時鐘值越大,說明這一次選舉leader的進程更新.

選舉的標準就變成:

                     一、邏輯時鐘小的選舉結果被忽略,從新投票

                     二、統一邏輯時鐘後,數據id大的勝出

                     三、數據id相同的狀況下,leader id大的勝出

根據這個規則選出leader。

相關文章
相關標籤/搜索