一、需求背景node
有一個集羣(N臺服務器),須要訪問一個外部接口獲取數據,可是每一個時間點只須要一個機器來執行任務就好了,不須要全部服務器同時執行,由於集羣共享數據庫。數據庫
二、思路緩存
經過Zookeeper選取鏈接序列號最小的服務器做爲當前時間執行任務的機器,若是當前機器宕機,經過Zookeeper自動發現機制,從新選擇新的執行機器,除非整個集羣掛掉;服務器
(固然,改思路可讓集羣處理同一個任務集的不一樣部分,如N1處理1...N,N2處理N+1....N2等)ide
三、代碼this
//鏈接管理客戶端 public class ZkClientManager implements Watcher,InitializingBean { private Logger log = LoggerFactory.getLogger(getClass()); private final String ROOT_PATH = "/car_position"; private final String SUB_PATH = ROOT_PATH + "/sub_system_"; @Value("${zookeeper_address}") public String CONNECT_PATH = "127.0.0.1:2181"; private Integer SESSION_TIME_OUT = 10000; private ZooKeeper zk = null; private String SELF_PATH = null; private boolean createRootPath(String data) throws KeeperException, InterruptedException { if (zk.exists(ROOT_PATH, true) == null) { String rootpath = zk.create(ROOT_PATH, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); log.info("------#建立根節點成功rootpath:" + rootpath); } else { log.info("------#根路徑節點已經存在,不在重複建立"); } return true; } private boolean createSubNode() throws KeeperException, InterruptedException { SELF_PATH = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); log.info("------#建立當前zk節點路徑成功,path=" + SELF_PATH); List<String> paths = zk.getChildren(ROOT_PATH, new ZkNodeWatcher(zk, SELF_PATH)); WangKuoGpsUtil.sortAndCalcPath(SELF_PATH,paths); return true; } private void deletePath() { try { zk.delete(SELF_PATH, -1); log.info("-------#刪除PATH=" + SELF_PATH); } catch (Exception e) { log.error("------#刪除zk節點路徑異常",e); } } private void releaseConn() { if (zk != null) { try { zk.close(); } catch (InterruptedException e) { log.error("------#zookeeper關閉鏈接異常"); } } } public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); Event.EventType type = event.getType(); if (Event.KeeperState.SyncConnected == state) { if (Event.EventType.None == type) { log.info("------#成功鏈接上zk服務器"); } } else if (Event.KeeperState.Disconnected == state) { log.info("------#與ZK服務器斷開鏈接"); } else if (Event.KeeperState.Expired == state) { log.info("會話失效"); } } @Override public void afterPropertiesSet() throws Exception { log.info("------#Zookeeper鏈接服務器地址{},timeout={}",CONNECT_PATH,SESSION_TIME_OUT); zk = new ZooKeeper(CONNECT_PATH, SESSION_TIME_OUT, this); log.info("------#初始化zookeeper鏈接成功"); createRootPath(""); createSubNode(); } }
//監聽器,接收節點變動通知,收到通知後從新計算 public class ZkNodeWatcher implements Watcher { private Logger log = LoggerFactory.getLogger(getClass()); private ZooKeeper zooKeeper; private String selfPath; public ZkNodeWatcher(ZooKeeper zooKeeper,String selfPath){ this.zooKeeper = zooKeeper; this.selfPath =selfPath; } public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); Event.EventType type = event.getType(); try { log.info("------#Zookeeper NodeWather,state={},type={}",state,type); if(Event.EventType.NodeChildrenChanged == type){ List<String> paths = zooKeeper.getChildren(event.getPath(), this); WangKuoGpsUtil.sortAndCalcPath(selfPath,paths); } } catch (Exception e) { log.error("------#處理時間異常",e); } } }
//對服務器在Zookeeper上的須要進行排序,並計算出當前節點的排序位 public class WangKuoGpsUtil { private static Logger log = LoggerFactory.getLogger(WangKuoGpsUtil.class); public static void sortAndCalcPath(String nodePath,List<String> paths) { log.info(" ------#nodePath={} 監聽節點--->{} ",nodePath,JSONObject.toJSON(paths)); Collections.sort(paths); GpsQueueCache.setNodeCount(paths.size()); GpsQueueCache.setLastTime(0L);//強制從數據庫中拉取 for(int i=0;i<paths.size();i++){ if(nodePath.contains(paths.get(i))){ GpsQueueCache.setNodeIndex(i+1); break; } } }
//緩存當前系統狀態參數,用於業務計算 public class GpsQueueCache { //最近一次從數據庫拉取數據的時間 private static Long lastTime = 0L; //當前服務器節點順序位 private static Integer nodeIndex=1; //當前服務器節點總量,可用於計算當前服務器處理得數據片斷,根據節點偏移nodeIndex劃分 private static Integer nodeCount=1;
說明:排序
一、客戶端啓動後鏈接Zookeeper,獲取當前節點註冊的序列號;接口
二、計算本身在集羣中的當前位置;get
三、根據位置信息作本身該作的事情。it