Zookeeper開發分佈式系統,動態服務上下線感知

什麼是Zookeeper

Zookeeper是一個分佈式開源框架,提供了協調分佈式應用的基本服務,它向外部應用暴露一組通用服務——分佈式同步(Distributed Synchronization)、命名服務(Naming Service)、集羣維護(Group Maintenance)等,簡化分佈式應用協調及其管理的難度,提供高性能的分佈式服務。ZooKeeper自己能夠以單機模式安裝運行,不過它的長處在於經過分佈式ZooKeeper集羣(一個Leader,多個Follower),基於必定的策略來保證ZooKeeper集羣的穩定性和可用性,從而實現分佈式應用的可靠性。node

Zookeeper簡介

一、Zookeeper是爲別的分佈式程序服務的
二、Zookeeper自己就是一個分佈式程序(只要有半數以上節點存活,zk就能正常服務)
三、Zookeeper所提供的服務涵蓋:主從協調、服務器節點動態上下線、統一配置管理、分佈式共享鎖、統> 一名稱服務等
四、雖說能夠提供各類服務,可是zookeeper在底層其實只提供了兩個功能:
4.1:管理(存儲,讀取)用戶程序提交的數據(相似namenode中存放的metadata)
4.2:爲用戶程序提供數據節點監聽服務apache

Zookeeper應用場景圖

Zookeeper集羣機制

Zookeeper集羣的角色: Leader 和 follower 
只要集羣中有半數以上節點存活,集羣就能提供服務bash

Zookeeper特性

一、Zookeeper:一個leader,多個follower組成的集羣
二、全局數據一致:每一個server保存一份相同的數據副本,client不管鏈接到哪一個server,數據都是一致的
三、分佈式讀寫,更新請求轉發,由leader實施
四、更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行
五、數據更新原子性,一次數據更新要麼成功,要麼失敗
六、實時性,在必定時間範圍內,client能讀到最新數據

zookeeper的數據存儲機制

數據存儲形式服務器

zookeeper中對用戶的數據採用kv形式存儲app

只是zk有點特別:框架

key:是以路徑的形式表示的,那就覺得着,各key之間有父子關係,好比dom

/ 是頂層keyeclipse

用戶建的key只能在/ 下做爲子節點,好比建一個key: /aa  這個key能夠帶value數據ssh

 也能夠建一個key:   /bbsocket

也能夠建key: /aa/xx

zookeeper中,對每個數據key,稱做一個znode

綜上所述,zk中的數據存儲形式以下:

znode類型

zookeeper中的znode有多種類型:

  1. PERSISTENT  持久的:建立者就算跟集羣斷開聯繫,該類節點也會持久存在與zk集羣中
  2. EPHEMERAL  短暫的:建立者一旦跟集羣斷開聯繫,zk就會將這個節點刪除
  3. SEQUENTIAL  帶序號的:這類節點,zk會自動拼接上一個序號,並且序號是遞增的

組合類型:

    PERSISTENT  :持久不帶序號

    EPHEMERAL  :短暫不帶序號

    PERSISTENT  且 SEQUENTIAL   :持久且帶序號

    EPHEMERAL  且 SEQUENTIAL  :短暫且帶序號

zookeeper的集羣部署

集羣選舉示意圖

解壓Zokeeper安裝包到apps目錄下
tar -zxvf zookeeper-3.4.6.tar.gz -C apps

cd /root/apps/zookeeper-3.4.6/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
修改dataDir=/root/zkdata
在後面加上集羣的機器:2888是leader和follower通信端口,3888是投票的

server.1=hdp-01:2888:3888
server.2=hdp-02:2888:3888
server.3=hdp-03:2888:3888

對3臺節點,都建立目錄 mkdir /root/zkdata
對3臺節點,在工做目錄中生成myid文件,但內容要分別爲各自的id: 1,2,3

echo 1 > /root/zkdata/myid
echo 2 > /root/zkdata/myid
echo 3 > /root/zkdata/myid

從hdp20-01上scp安裝目錄到其餘兩個節點
cd apps
scp -r zookeeper-3.4.6/ hdp-02:$PWD
scp -r zookeeper-3.4.6/ hdp-03:$PWD

啓動zookeeper集羣
zookeeper沒有提供自動批量啓動腳本,須要手動一臺一臺地起zookeeper進程
在每一臺節點上,運行命令:

cd /root/apps/zookeeper-3.4.6
bin/zkServer.sh start
啓動後,用jps應該能看到一個進程:QuorumPeerMain

可是,光有進程不表明zk已經正常服務,須要用命令檢查狀態:
bin/zkServer.sh status
能看到角色模式:爲leader或follower,即正常了。

本身寫個腳本,一鍵啓動
vi zkmanage.sh

#!/bin/bash
for host in hdp-01 hdp-02 hdp-03
do
echo "${host}:$1ing....."
ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
done

中止命令:sh zjmanage.sh stop
加個可執行權限:chmod +zkmanage.sh
啓動命令:./zkmanage.sh start
可是出現沒有Java環境變量問題,修改配置文件
vi zkmanage.sh
修改配置以下

#!/bin/bash
for host in hdp-01 hdp-02 hdp-03
do
echo "${host}:$1ing....."
ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
done

sleep 2
for host in hdp-01 hdp-02 hdp-03
do
ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh status"
done

啓動集羣結果

hdp-01:starting.....
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
hdp-02:starting.....
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
hdp-03:starting.....
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: leader
JMX enabled by default
Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower

Zookeeper的Java客戶端操做代碼

public class ZookeeperCliDemo {
    ZooKeeper zk =null;
    @Before
    public void init() throws Exception {
       zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
    }
    @Test
    public void testCreate() throws Exception {
        //參數1:要建立的節點路徑;參數2:數據;參數3:訪問權限;參數4:節點類型
        String create = zk.create("/eclipse", "hello eclipse".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(create);
        zk.close();
    }

    @Test
    public void testUpdate() throws Exception {
        //參數1:節點路徑;參數2:數據;參數3:所要修改的版本,-1表示任意版本
        zk.setData("/eclipse","我喜歡青青".getBytes(),-1);
        zk.close();
    }
   @Test
    public void testGet() throws Exception {
       //參數1:節點路徑;參數2:事件監聽;參數3:所要修改的版本,null表示最新版本
        byte[] data = zk.getData("/eclipse", false, null);
        System.out.println(new String(data,"UTF-8"));
        zk.close();
    }

    @Test
    public void testListChildren() throws KeeperException, InterruptedException {
        //參數1:節點路徑;參數2:是否要監聽
        //注意:返回的結果只有子節點的名字,不帶全路徑
        List<String> children = zk.getChildren("/cc", false);
        for(String child:children){
            System.out.println(child);
        }
        zk.close();
    }

    @Test
    public void testRm() throws KeeperException, InterruptedException {
        zk.delete("/eclipse",-1);
        zk.close();
    }
}

Zookeeper監聽功能代碼

public class ZookeeperWatchDemo {
    ZooKeeper zk =null;
    @Before
    public void init() throws Exception {
        zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000,  new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected
                        && watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                    System.out.println("收到事件所發生節點的路徑" + watchedEvent.getPath());
                    System.out.println("收到事件所發生節點的狀態" + watchedEvent.getState());
                    System.out.println("收到事件所發生節點的類型" + watchedEvent.getType());
                    System.out.println("watch事件通知。。換照片");
                    try {
                        zk.getData("/mygirls", true, null);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }else if(watchedEvent.getState()==Event.KeeperState.SyncConnected &&
                        watchedEvent.getType()==Event.EventType.NodeChildrenChanged){
                    System.out.println("收到事件所發生節點的路徑" + watchedEvent.getPath());
                    System.out.println("收到事件所發生節點的狀態" + watchedEvent.getState());
                    System.out.println("收到事件所發生節點的類型" + watchedEvent.getType());

                }
            }
        });
    }

    @Test
    public void testGetWatch() throws Exception {
        byte[] data = zk.getData("/mygirls",true, null);
        List<String> children = zk.getChildren("/mygirls", true);
        System.out.println(new String(data,"UTF-8"));
        Thread.sleep(Long.MAX_VALUE);
    }
}

Zookeeper開發分佈式系統案例代碼,動態上下線感知

服務代碼

public class TimeQueryServer {
    ZooKeeper zk=null;
    public void connectZk()throws Exception{
        zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
    }

    public void registerServerInfo(String hostname,String port)throws Exception{
        /**
         * 先判斷註冊節點的父節點是否存在,若是不存在,則建立持久節點
         */
        Stat exists = zk.exists("/servers", false);
        if(exists==null){
            zk.create("/servers",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }
        /**
         * 註冊服務器數據到zk的約定註冊節點下
         */
        String create = zk.create("/servers/server", (hostname + ":" + port).getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname+" 服務器向zk 註冊成功,註冊節點爲:/servers"+create);
    }
    public static void main(String[] args) throws Exception {
        //1.構造zk鏈接
        TimeQueryServer timeQueryServer = new TimeQueryServer();
        timeQueryServer.connectZk();
        //2.註冊服務器信息
        timeQueryServer.registerServerInfo("192.168.150.3","44772");
        //3.啓動業務線程開始處理業務
        new TimeQueryService(44772).start();
    }
}
public class TimeQueryService extends Thread {
    int port=0;
    public TimeQueryService(int port){
        this.port=port;
    }
    @Override
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(port);
            System.out.println("業務線程已經綁定端口"+port+"開始接受客戶端請求..");
            while (true){
                Socket sc = ss.accept();
                InputStream inputStream = sc.getInputStream();
                OutputStream outputStream = sc.getOutputStream();
                outputStream.write(new Date().toString().getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消費者代碼

public class Consumer {

    //定義一個list用於存放在線的服務器列表
    private volatile ArrayList<String>onlineServers=new ArrayList<String>();
    ZooKeeper zk=null;
    public void connectZk()throws Exception{
        zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState()==Event.KeeperState.SyncConnected && watchedEvent.getType()==Event.EventType.NodeChildrenChanged){
                    try{
                        //事件回調邏輯中,再次查詢zk上在線服務器節點便可,查詢邏輯中又再次註冊子節點變化事件監聽
                        getOnlineServers();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        });
    }
    //查詢在線服務器列表
    public void getOnlineServers()throws Exception{
        List<String> children = zk.getChildren("/servers", true);
        ArrayList<String> servers = new ArrayList<String>();
        for (String child:children){
            byte[] data = zk.getData("/servers/" + child, false, null);
            String serverInfo=new String(data);
            servers.add(serverInfo);
        }
        onlineServers=servers;
        System.out.println("查詢了一次zk,當前在線的服務器有:"+servers);

    }

    public void setRequest() throws Exception {
        Random random = new Random();
        while (true){
            try {
                int nextInt=random.nextInt(onlineServers.size());
                String server=onlineServers.get(nextInt);
                String hostname=server.split(":")[0];
                int port=Integer.parseInt(server.split(":")[1]);
                System.out.println("本次請求挑選的服務器爲:"+server);

                Socket socket = new Socket(hostname, port);
                OutputStream out = socket.getOutputStream();
                InputStream in = socket.getInputStream();
                out.write("hahaha".getBytes());
                out.flush();

                byte[] buf = new byte[256];
                int read=in.read(buf);
                String s = new String(buf, 0, read);
                System.out.println("服務器響應時間爲:"+s);
                out.close();
                in.close();
                socket.close();
                Thread.sleep(2000);
            }catch (Exception e){

            }

        }
    }
    public static void main(String[] args) throws Exception {
        //構造zk鏈接對象
        Consumer consumer = new Consumer();
        consumer.connectZk();
        //查詢在線服務器列表
        consumer.getOnlineServers();
        //處理業務
        consumer.setRequest();
    }
}

pom

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
    </dependencies>

啓動多個服務

控制檯輸出

192.168.150.3 服務器向zk 註冊成功,註冊節點爲:/servers/servers/server0000000018
業務線程已經綁定端口44772開始接受客戶端請求..

192.168.150.3 服務器向zk 註冊成功,註冊節點爲:/servers/servers/server0000000019
業務線程已經綁定端口44773開始接受客戶端請求..

192.168.150.3 服務器向zk 註冊成功,註冊節點爲:/servers/servers/server0000000020
業務線程已經綁定端口44774開始接受客戶端請求..

消費者啓動

控制檯輸出

查詢了一次zk,當前在線的服務器有:[192.168.150.3:44773, 192.168.150.3:44772, 192.168.150.3:44774]
本次請求挑選的服務器爲:192.168.150.3:44772
服務器響應時間爲:Mon Jun 03 20:03:21 CST 2019
本次請求挑選的服務器爲:192.168.150.3:44773
服務器響應時間爲:Mon Jun 03 20:03:23 CST 2019
本次請求挑選的服務器爲:192.168.150.3:44773
服務器響應時間爲:Mon Jun 03 20:03:25 CST 2019
本次請求挑選的服務器爲:192.168.150.3:44772
服務器響應時間爲:Mon Jun 03 20:03:27 CST 2019

下線一個服務後,控制檯輸出

查詢了一次zk,當前在線的服務器有:[192.168.150.3:44773, 192.168.150.3:44772]
本次請求挑選的服務器爲:192.168.150.3:44773
服務器響應時間爲:Mon Jun 03 20:04:19 CST 2019
本次請求挑選的服務器爲:192.168.150.3:44773
服務器響應時間爲:Mon Jun 03 20:04:21 CST 2019
本次請求挑選的服務器爲:192.168.150.3:44773
服務器響應時間爲:Mon Jun 03 20:04:23 CST 2019
本次請求挑選的服務器爲:192.168.150.3:44773

版權@須臾之餘https://my.oschina.net/u/3995125

相關文章
相關標籤/搜索