zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維java
多個訂閱者對象同時監聽同一主題對象,主題對象狀態變化時通知全部訂閱者對象更新自身狀態。發佈方和訂閱方獨立封裝、獨立改變,當一個對象的改變須要同時改變其餘對象,而且它不知道有多少個對象須要改變時,可使用發佈訂閱模式。mysql
在分佈式系統中的頂級應用有配置管理和服務發現。sql
配置管理:指集羣中的機器擁有某些配置,而且這些配置信息須要動態地改變,那麼咱們就可使用發佈訂閱模式把配置作統一的管理,讓這些機器訂閱配置信息的改變,可是配置改變時這些機器獲得通知並更新本身的配置。segmentfault
服務發現:指對集羣中的服務上下線作統一管理,每一個工做服務器均可以做爲數據的發佈方,向集羣註冊本身的基本信息,而讓某些監控服務器做爲訂閱方,訂閱工做服務器的基本信息。當工做服務器的基本信息改變時,如服務上下線、服務器的角色或服務範圍變動,那麼監控服務器能夠獲得通知並響應這些變化。服務器
左邊表明Zookeeper集羣,右側表明服務器集羣。其中前3個爲工做服務器Work Server,綠色爲管理服務器Manage Server,最下面的是控制服務器Control Server。架構
config節點,用於配置管理。Manage Server經過config節點下發配置信息,Work Server能夠經過訂閱config節點的改變來更新本身的配置。負載均衡
Servers節點,用於服務發現,每一個Work Server在啓動時都會在Servers節點下建立一個臨時節點,Manage Server充當monitor,經過監聽Servers節點的子節點列表的變化來更新本身內存中工做服務器列表的信息。運維
經過Control Server由Command節點做爲中介,向Manage Server發送控制指令。Control Server向command節點寫入命令信息,Manage Server訂閱command節點的數據改變來監聽並執行命令。分佈式
Manage Server程序主體流程ide
WorkServer對應架構圖的Work Server;
ManageServer對應架構圖的Manage Server;
ServerConfig用於記錄Work Server的配置信息;
ServerData用於記錄Work Server的基本信息;
SubscribeZkClient做爲示例程序入口服務站啓動Work Server和Manage Server
/** * 配置信息 */ public class ServerConfig { private String dbUrl; private String dbPwd; private String dbUser; public String getDbUrl() { return dbUrl; } public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl; } public String getDbPwd() { return dbPwd; } public void setDbPwd(String dbPwd) { this.dbPwd = dbPwd; } public String getDbUser() { return dbUser; } public void setDbUser(String dbUser) { this.dbUser = dbUser; } @Override public String toString() { return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd + ", dbUser=" + dbUser + "]"; } }
/** * 服務器基本信息 */ public class ServerData { private String address; private Integer id; private String name; public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "ServerData [address=" + address + ", id=" + id + ", name=" + name + "]"; } }
/** * 表明工做服務器 */ public class WorkServer { private ZkClient zkClient; // ZooKeeper private String configPath; // ZooKeeper集羣中servers節點的路徑 private String serversPath; // 當前工做服務器的基本信息 private ServerData serverData; // 當前工做服務器的配置信息 private ServerConfig serverConfig; private IZkDataListener dataListener; public WorkServer(String configPath, String serversPath, ServerData serverData, ZkClient zkClient, ServerConfig initConfig) { this.zkClient = zkClient; this.serversPath = serversPath; this.configPath = configPath; this.serverConfig = initConfig; this.serverData = serverData; this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { } public void handleDataChange(String dataPath, Object data) throws Exception { String retJson = new String((byte[])data); ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class); updateConfig(serverConfigLocal); System.out.println("new Work server config is:"+serverConfig.toString()); } }; } // 啓動服務器 public void start() { System.out.println("work server start..."); initRunning(); } // 中止服務器 public void stop() { System.out.println("work server stop..."); zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消監聽config節點 } // 服務器初始化 private void initRunning() { registMe(); // 註冊本身 zkClient.subscribeDataChanges(configPath, dataListener); // 訂閱config節點的改變事件 } // 啓動時向zookeeper註冊本身的註冊函數 private void registMe() { String mePath = serversPath.concat("/").concat(serverData.getAddress()); try { zkClient.createEphemeral(mePath, JSON.toJSONString(serverData) .getBytes()); } catch (ZkNoNodeException e) { zkClient.createPersistent(serversPath, true); registMe(); } } // 更新本身的配置信息 private void updateConfig(ServerConfig serverConfig) { this.serverConfig = serverConfig; } }
public class ManageServer { // zookeeper的servers節點路徑 private String serversPath; // zookeeper的command節點路徑 private String commandPath; // zookeeper的config節點路徑 private String configPath; private ZkClient zkClient; private ServerConfig config; // 用於監聽servers節點的子節點列表的變化 private IZkChildListener childListener; // 用於監聽command節點數據內容的變化 private IZkDataListener dataListener; // 工做服務器的列表 private List<String> workServerList; public ManageServer(String serversPath, String commandPath, String configPath, ZkClient zkClient, ServerConfig config) { this.serversPath = serversPath; this.commandPath = commandPath; this.zkClient = zkClient; this.config = config; this.configPath = configPath; this.childListener = new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { // TODO Auto-generated method stub workServerList = currentChilds; // 更新內存中工做服務器列表 System.out.println("work server list changed, new list is "); execList(); } }; this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { // TODO Auto-generated method stub // ignore; } public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub String cmd = new String((byte[]) data); System.out.println("cmd:"+cmd); exeCmd(cmd); // 執行命令 } }; } private void initRunning() { zkClient.subscribeDataChanges(commandPath, dataListener); zkClient.subscribeChildChanges(serversPath, childListener); } /* * 1: list 2: create 3: modify */ private void exeCmd(String cmdType) { if ("list".equals(cmdType)) { execList(); } else if ("create".equals(cmdType)) { execCreate(); } else if ("modify".equals(cmdType)) { execModify(); } else { System.out.println("error command!" + cmdType); } } // 列出工做服務器列表 private void execList() { System.out.println(workServerList.toString()); } // 建立config節點 private void execCreate() { if (!zkClient.exists(configPath)) { try { zkClient.createPersistent(configPath, JSON.toJSONString(config) .getBytes()); } catch (ZkNodeExistsException e) { zkClient.writeData(configPath, JSON.toJSONString(config) .getBytes()); // config節點已經存在,則寫入內容就能夠了 } catch (ZkNoNodeException e) { String parentDir = configPath.substring(0, configPath.lastIndexOf('/')); zkClient.createPersistent(parentDir, true); execCreate(); } } } // 修改config節點內容 private void execModify() { // 咱們隨意修改config的一個屬性就能夠了 config.setDbUser(config.getDbUser() + "_modify"); try { zkClient.writeData(configPath, JSON.toJSONString(config).getBytes()); } catch (ZkNoNodeException e) { execCreate(); // 寫入時config節點還未存在,則建立它 } } // 啓動工做服務器 public void start() { initRunning(); } // 中止工做服務器 public void stop() { zkClient.unsubscribeChildChanges(serversPath, childListener); zkClient.unsubscribeDataChanges(commandPath, dataListener); } }
/** * 調度類 */ public class SubscribeZkClient { private static final int CLIENT_QTY = 5; // Work Server數量 private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; private static final String CONFIG_PATH = "/config"; private static final String COMMAND_PATH = "/command"; private static final String SERVERS_PATH = "/servers"; public static void main(String[] args) throws Exception { List<ZkClient> clients = new ArrayList<ZkClient>(); List<WorkServer> workServers = new ArrayList<WorkServer>(); ManageServer manageServer = null; try { // 建立一個默認的配置 ServerConfig initConfig = new ServerConfig(); initConfig.setDbPwd("123456"); initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb"); initConfig.setDbUser("root"); // 實例化一個Manage Server ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer()); manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig); manageServer.start(); // 啓動Manage Server // 建立指定個數的工做服務器 for ( int i = 0; i < CLIENT_QTY; ++i ) { ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer()); clients.add(client); ServerData serverData = new ServerData(); serverData.setId(i); serverData.setName("WorkServer#"+i); serverData.setAddress("192.168.1."+i); WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig); workServers.add(workServer); workServer.start(); // 啓動工做服務器 } System.out.println("敲回車鍵退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
咱們用zookeeper的命令行客戶端向Manage Server下達指令。
執行zkCli命令:
create /command list
java控制檯輸出:
cmd:list [192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2, 192.168.1.4]
執行zkCli命令:
set /command create
java控制檯輸出:
cmd:create new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root] new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root] new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root] new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root] new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
執行zkCli命令:
set /command modify
java控制檯輸出:
cmd:modify new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root] new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root] new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root] new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root] new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維