zookeeper系列(三)實戰數據發佈訂閱

zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維java

數據發佈訂閱

多個訂閱者對象同時監聽同一主題對象,主題對象狀態變化時通知全部訂閱者對象更新自身狀態。發佈方和訂閱方獨立封裝、獨立改變,當一個對象的改變須要同時改變其餘對象,而且它不知道有多少個對象須要改變時,可使用發佈訂閱模式。mysql

在分佈式系統中的頂級應用有配置管理和服務發現。sql

配置管理:指集羣中的機器擁有某些配置,而且這些配置信息須要動態地改變,那麼咱們就可使用發佈訂閱模式把配置作統一的管理,讓這些機器訂閱配置信息的改變,可是配置改變時這些機器獲得通知並更新本身的配置。segmentfault

服務發現:指對集羣中的服務上下線作統一管理,每一個工做服務器均可以做爲數據的發佈方,向集羣註冊本身的基本信息,而讓某些監控服務器做爲訂閱方,訂閱工做服務器的基本信息。當工做服務器的基本信息改變時,如服務上下線、服務器的角色或服務範圍變動,那麼監控服務器能夠獲得通知並響應這些變化。服務器

架構圖

clipboard.png

左邊表明Zookeeper集羣,右側表明服務器集羣。其中前3個爲工做服務器Work Server,綠色爲管理服務器Manage Server,最下面的是控制服務器Control Server。架構

config節點,用於配置管理。Manage Server經過config節點下發配置信息,Work Server能夠經過訂閱config節點的改變來更新本身的配置。負載均衡

Servers節點,用於服務發現,每一個Work Server在啓動時都會在Servers節點下建立一個臨時節點,Manage Server充當monitor,經過監聽Servers節點的子節點列表的變化來更新本身內存中工做服務器列表的信息。運維

經過Control Server由Command節點做爲中介,向Manage Server發送控制指令。Control Server向command節點寫入命令信息,Manage Server訂閱command節點的數據改變來監聽並執行命令。分佈式

流程圖

Manage Server程序主體流程ide

clipboard.png

核心類圖

clipboard.png

WorkServer對應架構圖的Work Server;
ManageServer對應架構圖的Manage Server;
ServerConfig用於記錄Work Server的配置信息;
ServerData用於記錄Work Server的基本信息;
SubscribeZkClient做爲示例程序入口服務站啓動Work Server和Manage Server

實現代碼

/**
 * 配置信息
 */
public class ServerConfig {
    
    private String dbUrl;
    private String dbPwd;
    private String dbUser;
    public String getDbUrl() {
        return dbUrl;
    }
    public void setDbUrl(String dbUrl) {
        this.dbUrl = dbUrl;
    }
    public String getDbPwd() {
        return dbPwd;
    }
    public void setDbPwd(String dbPwd) {
        this.dbPwd = dbPwd;
    }
    public String getDbUser() {
        return dbUser;
    }
    public void setDbUser(String dbUser) {
        this.dbUser = dbUser;
    }
    
    @Override
    public String toString() {
        return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd
                + ", dbUser=" + dbUser + "]";
    }

}
/**
 * 服務器基本信息
 */
public class ServerData {
    
    private String address;
    private Integer id;
    private String name;

    public String getAddress() {
        return address;
    }
    public void setAddress(String address) {
        this.address = address;
    }
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    
    @Override
    public String toString() {
        return "ServerData [address=" + address + ", id=" + id + ", name="
                + name + "]";
    }

}
/**
 * 表明工做服務器
 */
public class WorkServer {

    private ZkClient zkClient;
    // ZooKeeper
    private String configPath;
    // ZooKeeper集羣中servers節點的路徑
    private String serversPath;
    // 當前工做服務器的基本信息
    private ServerData serverData;
    // 當前工做服務器的配置信息
    private ServerConfig serverConfig;
    private IZkDataListener dataListener;

    public WorkServer(String configPath, String serversPath,
            ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
        this.zkClient = zkClient;
        this.serversPath = serversPath;
        this.configPath = configPath;
        this.serverConfig = initConfig;
        this.serverData = serverData;

        this.dataListener = new IZkDataListener() {

            public void handleDataDeleted(String dataPath) throws Exception {

            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
                String retJson = new String((byte[])data);
                ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class);
                updateConfig(serverConfigLocal);
                System.out.println("new Work server config is:"+serverConfig.toString());
                
            }
        };

    }

    // 啓動服務器
    public void start() {
        System.out.println("work server start...");
        initRunning();
    }

    // 中止服務器
    public void stop() {
        System.out.println("work server stop...");
        zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消監聽config節點
    }

    // 服務器初始化
    private void initRunning() {
        registMe(); // 註冊本身
        zkClient.subscribeDataChanges(configPath, dataListener); // 訂閱config節點的改變事件
    }

    // 啓動時向zookeeper註冊本身的註冊函數
    private void registMe() {
        String mePath = serversPath.concat("/").concat(serverData.getAddress());

        try {
            zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
                    .getBytes());
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(serversPath, true);
            registMe();
        }
    }

    // 更新本身的配置信息
    private void updateConfig(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
    }

}
public class ManageServer {

    // zookeeper的servers節點路徑
    private String serversPath;
    // zookeeper的command節點路徑
    private String commandPath;
    // zookeeper的config節點路徑
    private String configPath;
    private ZkClient zkClient;
    private ServerConfig config;
    // 用於監聽servers節點的子節點列表的變化
    private IZkChildListener childListener;
    // 用於監聽command節點數據內容的變化
    private IZkDataListener dataListener;
    // 工做服務器的列表
    private List<String> workServerList;

    public ManageServer(String serversPath, String commandPath,
            String configPath, ZkClient zkClient, ServerConfig config) {
        this.serversPath = serversPath;
        this.commandPath = commandPath;
        this.zkClient = zkClient;
        this.config = config;
        this.configPath = configPath;
        this.childListener = new IZkChildListener() {

            public void handleChildChange(String parentPath,
                    List<String> currentChilds) throws Exception {
                // TODO Auto-generated method stub
                workServerList = currentChilds; // 更新內存中工做服務器列表
                
                System.out.println("work server list changed, new list is ");
                execList();

            }
        };
        this.dataListener = new IZkDataListener() {

            public void handleDataDeleted(String dataPath) throws Exception {
                // TODO Auto-generated method stub
                // ignore;
            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
                // TODO Auto-generated method stub
                String cmd = new String((byte[]) data);
                System.out.println("cmd:"+cmd);
                exeCmd(cmd); // 執行命令

            }
        };

    }

    private void initRunning() {
        zkClient.subscribeDataChanges(commandPath, dataListener);
        zkClient.subscribeChildChanges(serversPath, childListener);
    }

    /*
     * 1: list 2: create 3: modify
     */
    private void exeCmd(String cmdType) {
        if ("list".equals(cmdType)) {
            execList();

        } else if ("create".equals(cmdType)) {
            execCreate();
        } else if ("modify".equals(cmdType)) {
            execModify();
        } else {
            System.out.println("error command!" + cmdType);
        }

    }

    // 列出工做服務器列表
    private void execList() {
        System.out.println(workServerList.toString());
    }

    // 建立config節點
    private void execCreate() {
        if (!zkClient.exists(configPath)) {
            try {
                zkClient.createPersistent(configPath, JSON.toJSONString(config)
                        .getBytes());
            } catch (ZkNodeExistsException e) {
                zkClient.writeData(configPath, JSON.toJSONString(config)
                        .getBytes()); // config節點已經存在,則寫入內容就能夠了
            } catch (ZkNoNodeException e) {
                String parentDir = configPath.substring(0,
                        configPath.lastIndexOf('/'));
                zkClient.createPersistent(parentDir, true);
                execCreate();
            }
        }
    }

    // 修改config節點內容
    private void execModify() {
        // 咱們隨意修改config的一個屬性就能夠了
        config.setDbUser(config.getDbUser() + "_modify");

        try {
            zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
        } catch (ZkNoNodeException e) {
            execCreate(); // 寫入時config節點還未存在,則建立它
        }
    }

    // 啓動工做服務器
    public void start() {
        initRunning();
    }

    // 中止工做服務器
    public void stop() {
        zkClient.unsubscribeChildChanges(serversPath, childListener);
        zkClient.unsubscribeDataChanges(commandPath, dataListener);
    }

}
/**
 * 調度類
 */
public class SubscribeZkClient {
    
       private static final int  CLIENT_QTY = 5; // Work Server數量

        private static final String  ZOOKEEPER_SERVER = "192.168.1.105:2181";
        
        private static final String  CONFIG_PATH = "/config";
        private static final String  COMMAND_PATH = "/command";
        private static final String  SERVERS_PATH = "/servers";
           
        public static void main(String[] args) throws Exception {

            List<ZkClient>  clients = new ArrayList<ZkClient>();
            List<WorkServer>  workServers = new ArrayList<WorkServer>();
            ManageServer manageServer = null;

            try {

                // 建立一個默認的配置
                ServerConfig initConfig = new ServerConfig();
                initConfig.setDbPwd("123456");
                initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
                initConfig.setDbUser("root");

                // 實例化一個Manage Server
                ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
                manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
                manageServer.start(); // 啓動Manage Server

                // 建立指定個數的工做服務器
                for ( int i = 0; i < CLIENT_QTY; ++i ) {
                    ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
                    clients.add(client);
                    ServerData serverData = new ServerData();
                    serverData.setId(i);
                    serverData.setName("WorkServer#"+i);
                    serverData.setAddress("192.168.1."+i);

                    WorkServer  workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
                    workServers.add(workServer);
                    workServer.start();    // 啓動工做服務器
                    
                }

                System.out.println("敲回車鍵退出!\n");
                new BufferedReader(new InputStreamReader(System.in)).readLine();
                
            } finally {
                System.out.println("Shutting down...");

                for ( WorkServer workServer : workServers ) {
                    try {
                        workServer.stop();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }               
                }

                for ( ZkClient client : clients ) {
                    try {
                        client.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    
                }
            }
        }    

}

咱們用zookeeper的命令行客戶端向Manage Server下達指令。
執行zkCli命令:

create /command list

java控制檯輸出:

cmd:list
[192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2, 192.168.1.4]

執行zkCli命令:

set /command create

java控制檯輸出:

cmd:create
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]

執行zkCli命令:

set /command modify

java控制檯輸出:

cmd:modify
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]

zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維

相關文章
相關標籤/搜索