使用Zookeeper實現集羣中選擇單機器執行任務並自動切換

一、需求背景node

     有一個集羣(N臺服務器),須要訪問一個外部接口獲取數據,可是每一個時間點只須要一個機器來執行任務就好了,不須要全部服務器同時執行,由於集羣共享數據庫。數據庫

二、思路緩存

   經過Zookeeper選取鏈接序列號最小的服務器做爲當前時間執行任務的機器,若是當前機器宕機,經過Zookeeper自動發現機制,從新選擇新的執行機器,除非整個集羣掛掉;服務器

  (固然,改思路可讓集羣處理同一個任務集的不一樣部分,如N1處理1...N,N2處理N+1....N2等)ide

三、代碼this

//鏈接管理客戶端
public class ZkClientManager implements Watcher,InitializingBean {
    private Logger log = LoggerFactory.getLogger(getClass());

    private final String ROOT_PATH = "/car_position";
    private final String SUB_PATH = ROOT_PATH + "/sub_system_";

    @Value("${zookeeper_address}")
    public String CONNECT_PATH = "127.0.0.1:2181";
    private Integer SESSION_TIME_OUT = 10000;

    private ZooKeeper zk = null;
    private String SELF_PATH = null;

    private boolean createRootPath(String data) throws KeeperException, InterruptedException {
        if (zk.exists(ROOT_PATH, true) == null) {
            String rootpath = zk.create(ROOT_PATH, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            log.info("------#建立根節點成功rootpath:" + rootpath);
        } else {
            log.info("------#根路徑節點已經存在,不在重複建立");
        }
        return true;
    }

    private boolean createSubNode() throws KeeperException, InterruptedException {
        SELF_PATH = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        log.info("------#建立當前zk節點路徑成功,path=" + SELF_PATH);
        List<String> paths = zk.getChildren(ROOT_PATH, new ZkNodeWatcher(zk, SELF_PATH));
        WangKuoGpsUtil.sortAndCalcPath(SELF_PATH,paths);
        return true;
    }

    private void deletePath() {
        try {
            zk.delete(SELF_PATH, -1);
            log.info("-------#刪除PATH=" + SELF_PATH);
        } catch (Exception e) {
            log.error("------#刪除zk節點路徑異常",e);
        }
    }

    private void releaseConn() {
        if (zk != null) {
            try {
                zk.close();
            } catch (InterruptedException e) {
                log.error("------#zookeeper關閉鏈接異常");
            }
        }
    }

    public void process(WatchedEvent event) {
        Event.KeeperState state = event.getState();
        Event.EventType type = event.getType();
        if (Event.KeeperState.SyncConnected == state) {
            if (Event.EventType.None == type) {
                log.info("------#成功鏈接上zk服務器");
            }
        } else if (Event.KeeperState.Disconnected == state) {
            log.info("------#與ZK服務器斷開鏈接");
        } else if (Event.KeeperState.Expired == state) {
            log.info("會話失效");
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("------#Zookeeper鏈接服務器地址{},timeout={}",CONNECT_PATH,SESSION_TIME_OUT);
        zk = new ZooKeeper(CONNECT_PATH, SESSION_TIME_OUT, this);
        log.info("------#初始化zookeeper鏈接成功");
        createRootPath("");
        createSubNode();
    }
}
//監聽器,接收節點變動通知,收到通知後從新計算
public class ZkNodeWatcher implements Watcher {

    private Logger log = LoggerFactory.getLogger(getClass());
    private ZooKeeper zooKeeper;
    private String selfPath;

    public ZkNodeWatcher(ZooKeeper zooKeeper,String selfPath){
        this.zooKeeper = zooKeeper;
        this.selfPath =selfPath;
    }

    public void process(WatchedEvent event) {
        Event.KeeperState state = event.getState();
        Event.EventType type = event.getType();
        try {
            log.info("------#Zookeeper NodeWather,state={},type={}",state,type);
            if(Event.EventType.NodeChildrenChanged == type){
                List<String> paths = zooKeeper.getChildren(event.getPath(), this);
                WangKuoGpsUtil.sortAndCalcPath(selfPath,paths);
            }
        } catch (Exception e) {
           log.error("------#處理時間異常",e);
        }
    }
}

 

//對服務器在Zookeeper上的須要進行排序,並計算出當前節點的排序位
public class WangKuoGpsUtil {
    private static Logger log = LoggerFactory.getLogger(WangKuoGpsUtil.class);
     public static void sortAndCalcPath(String nodePath,List<String> paths) {
       log.info(" ------#nodePath={} 監聽節點--->{} ",nodePath,JSONObject.toJSON(paths));
        Collections.sort(paths);
        GpsQueueCache.setNodeCount(paths.size());
        GpsQueueCache.setLastTime(0L);//強制從數據庫中拉取
        for(int i=0;i<paths.size();i++){
            if(nodePath.contains(paths.get(i))){
                GpsQueueCache.setNodeIndex(i+1);
                break;
            }
        }
    }

 

//緩存當前系統狀態參數,用於業務計算
public class GpsQueueCache {

    //最近一次從數據庫拉取數據的時間
    private  static Long lastTime = 0L;
    //當前服務器節點順序位
    private static Integer nodeIndex=1;
    //當前服務器節點總量,可用於計算當前服務器處理得數據片斷,根據節點偏移nodeIndex劃分
    private static Integer nodeCount=1;

說明:排序

     一、客戶端啓動後鏈接Zookeeper,獲取當前節點註冊的序列號;接口

     二、計算本身在集羣中的當前位置;get

    三、根據位置信息作本身該作的事情。it

相關文章
相關標籤/搜索