zookeeper系列(二)實戰master選舉

master選舉

考慮7*24小時向外提供服務的系統,不能有單點故障,因而咱們使用集羣,採用的是Master+Slave。集羣中有一臺主機和多臺備機,由主機向外提供服務,備機監聽主機狀態,一旦主機宕機,備機必需迅速接管主機繼續向外提供服務。在這個過程當中,從備機選出一臺機做爲主機的過程,就是Master選舉。segmentfault

架構圖

clipboard.png

左邊是ZooKeeper集羣,右邊是3臺工做服務器。工做服務器啓動時,會去ZooKeeper的Servers節點下建立臨時節點,並把基本信息寫入臨時節點。這個過程叫服務註冊,系統中的其餘服務能夠經過獲取Servers節點的子節點列表,來了解當前系統哪些服務器可用,這該過程叫作服務發現。接着這些服務器會嘗試建立Master臨時節點,誰建立成功誰就是Master,其餘的兩臺就做爲Slave。全部的Work Server必需關注Master節點的刪除事件。經過監聽Master節點的刪除事件,來了解Master服務器是否宕機(建立臨時節點的服務器一旦宕機,它所建立的臨時節點即會自動刪除)。一旦Master服務器宕機,必需開始新一輪的Master選舉。服務器

流程圖

clipboard.png

核心類圖

clipboard.png

WorkServer對應架構圖的WorkServer,是主工做類;
RunningData用來描述WorkServer的基本信息;
LeaderSelectorZkClient做爲調度器來啓動和中止WorkServer;網絡

代碼實現

/**
 * 工做服務器信息
 */
public class RunningData implements Serializable {

    private static final long serialVersionUID = 4260577459043203630L;
    

    private Long cid;
    private String name;
    public Long getCid() {
        return cid;
    }
    public void setCid(Long cid) {
        this.cid = cid;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

}
/**
 * 工做服務器
 */
public class WorkServer {

    // 記錄服務器狀態
    private volatile boolean running = false;

    private ZkClient zkClient;
    // Master節點對應zookeeper中的節點路徑
    private static final String MASTER_PATH = "/master";
    // 監聽Master節點刪除事件
    private IZkDataListener dataListener;
    // 記錄當前節點的基本信息
    private RunningData serverData;
    // 記錄集羣中Master節點的基本信息
    private RunningData masterData;
    
    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
    private int delayTime = 5;

    public WorkServer(RunningData rd) {
        this.serverData = rd; // 記錄服務器基本信息
        this.dataListener = new IZkDataListener() {

            public void handleDataDeleted(String dataPath) throws Exception {

                //takeMaster();

                if (masterData != null && masterData.getName().equals(serverData.getName())){
                    // 本身就是上一輪的Master服務器,則直接搶
                    takeMaster();
                } else {
                    // 不然,延遲5秒後再搶。主要是應對網絡抖動,給上一輪的Master服務器優先搶佔master的權利,避免沒必要要的數據遷移開銷
                    delayExector.schedule(new Runnable(){
                        public void run(){
                            takeMaster();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }

            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {

            }
        };
    }

    public ZkClient getZkClient() {
        return zkClient;
    }

    public void setZkClient(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    // 啓動服務器
    public void start() throws Exception {
        if (running) {
            throw new Exception("server has startup...");
        }
        running = true;
        // 訂閱Master節點刪除事件
        zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
        // 爭搶Master權利
        takeMaster();

    }

    // 中止服務器
    public void stop() throws Exception {
        if (!running) {
            throw new Exception("server has stoped");
        }
        running = false;
        
        delayExector.shutdown();
        // 取消Master節點事件訂閱
        zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
        // 釋放Master權利
        releaseMaster();

    }

    // 爭搶Master
    private void takeMaster() {
        if (!running)
            return;

        try {
            // 嘗試建立Master臨時節點
            zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
            masterData = serverData;
            System.out.println(serverData.getName()+" is master");

            // 做爲演示,咱們讓服務器每隔5秒釋放一次Master權利
            delayExector.schedule(new Runnable() {            
                public void run() {
                    // TODO Auto-generated method stub
                    if (checkMaster()){
                        releaseMaster();
                    }
                }
            }, 5, TimeUnit.SECONDS);
            
        } catch (ZkNodeExistsException e) { // 已被其餘服務器建立了
            // 讀取Master節點信息
            RunningData runningData = zkClient.readData(MASTER_PATH, true);
            if (runningData == null) {
                takeMaster(); // 沒讀到,讀取瞬間Master節點宕機了,有機會再次爭搶
            } else {
                masterData = runningData;
            }
        } catch (Exception e) {
            // ignore;
        }

    }

    // 釋放Master權利
    private void releaseMaster() {
        if (checkMaster()) {
            zkClient.delete(MASTER_PATH);
        }
    }

    // 檢測本身是否爲Master
    private boolean checkMaster() {
        try {
            RunningData eventData = zkClient.readData(MASTER_PATH);
            masterData = eventData;
            if (masterData.getName().equals(serverData.getName())) {
                return true;
            }
            return false;
        } catch (ZkNoNodeException e) {
            return false; // 節點不存在,本身確定不是Master了
        } catch (ZkInterruptedException e) {
            return checkMaster();
        } catch (ZkException e) {
            return false;
        }
    }

}
/**
 * 調度器
 */
public class LeaderSelectorZkClient {

    //啓動的服務個數
    private static final int        CLIENT_QTY = 10;
    //zookeeper服務器的地址
    private static final String     ZOOKEEPER_SERVER = "192.168.1.105:2181";
    
       
    public static void main(String[] args) throws Exception {
        //保存全部zkClient的列表
        List<ZkClient>  clients = new ArrayList<ZkClient>();
        //保存全部服務的列表
        List<WorkServer>  workServers = new ArrayList<WorkServer>();

        try {
            for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模擬建立10個服務器並啓動
                //建立zkClient
                ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
                clients.add(client);
                //建立serverData
                RunningData runningData = new RunningData();
                runningData.setCid(Long.valueOf(i));
                runningData.setName("Client #" + i);
                //建立服務
                WorkServer  workServer = new WorkServer(runningData);
                workServer.setZkClient(client);
                
                workServers.add(workServer);
                workServer.start();
            }

            System.out.println("敲回車鍵退出!\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();

        } finally {

            System.out.println("Shutting down...");

            for ( WorkServer workServer : workServers ) {
                try {
                    workServer.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }               
            }

            for ( ZkClient client : clients ) {
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }               
            }
        }        
    }
}

 

/**
 * 工做服務器信息
 */
public class RunningData implements Serializable {

    private static final long serialVersionUID = 4260577459043203630L;
    

    private Long cid;
    private String name;
    public Long getCid() {
        return cid;
    }
    public void setCid(Long cid) {
        this.cid = cid;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

}
/**
 * 工做服務器
 */
public class WorkServer {

    // 記錄服務器狀態
    private volatile boolean running = false;

    private ZkClient zkClient;
    // Master節點對應zookeeper中的節點路徑
    private static final String MASTER_PATH = "/master";
    // 監聽Master節點刪除事件
    private IZkDataListener dataListener;
    // 記錄當前節點的基本信息
    private RunningData serverData;
    // 記錄集羣中Master節點的基本信息
    private RunningData masterData;
    
    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
    private int delayTime = 5;

    public WorkServer(RunningData rd) {
        this.serverData = rd; // 記錄服務器基本信息
        this.dataListener = new IZkDataListener() {

            public void handleDataDeleted(String dataPath) throws Exception {

                //takeMaster();

                if (masterData != null && masterData.getName().equals(serverData.getName())){
                    // 本身就是上一輪的Master服務器,則直接搶
                    takeMaster();
                } else {
                    // 不然,延遲5秒後再搶。主要是應對網絡抖動,給上一輪的Master服務器優先搶佔master的權利,避免沒必要要的數據遷移開銷
                    delayExector.schedule(new Runnable(){
                        public void run(){
                            takeMaster();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }

            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {

            }
        };
    }

    public ZkClient getZkClient() {
        return zkClient;
    }

    public void setZkClient(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    // 啓動服務器
    public void start() throws Exception {
        if (running) {
            throw new Exception("server has startup...");
        }
        running = true;
        // 訂閱Master節點刪除事件
        zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
        // 爭搶Master權利
        takeMaster();

    }

    // 中止服務器
    public void stop() throws Exception {
        if (!running) {
            throw new Exception("server has stoped");
        }
        running = false;
        
        delayExector.shutdown();
        // 取消Master節點事件訂閱
        zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
        // 釋放Master權利
        releaseMaster();

    }

    // 爭搶Master
    private void takeMaster() {
        if (!running)
            return;

        try {
            // 嘗試建立Master臨時節點
            zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
            masterData = serverData;
            System.out.println(serverData.getName()+" is master");

            // 做爲演示,咱們讓服務器每隔5秒釋放一次Master權利
            delayExector.schedule(new Runnable() {            
                public void run() {
                    // TODO Auto-generated method stub
                    if (checkMaster()){
                        releaseMaster();
                    }
                }
            }, 5, TimeUnit.SECONDS);
            
        } catch (ZkNodeExistsException e) { // 已被其餘服務器建立了
            // 讀取Master節點信息
            RunningData runningData = zkClient.readData(MASTER_PATH, true);
            if (runningData == null) {
                takeMaster(); // 沒讀到,讀取瞬間Master節點宕機了,有機會再次爭搶
            } else {
                masterData = runningData;
            }
        } catch (Exception e) {
            // ignore;
        }

    }

    // 釋放Master權利
    private void releaseMaster() {
        if (checkMaster()) {
            zkClient.delete(MASTER_PATH);
        }
    }

    // 檢測本身是否爲Master
    private boolean checkMaster() {
        try {
            RunningData eventData = zkClient.readData(MASTER_PATH);
            masterData = eventData;
            if (masterData.getName().equals(serverData.getName())) {
                return true;
            }
            return false;
        } catch (ZkNoNodeException e) {
            return false; // 節點不存在,本身確定不是Master了
        } catch (ZkInterruptedException e) {
            return checkMaster();
        } catch (ZkException e) {
            return false;
        }
    }

}
/**
 * 調度器
 */
public class LeaderSelectorZkClient {

    //啓動的服務個數
    private static final int        CLIENT_QTY = 10;
    //zookeeper服務器的地址
    private static final String     ZOOKEEPER_SERVER = "192.168.1.105:2181";
    
       
    public static void main(String[] args) throws Exception {
        //保存全部zkClient的列表
        List<ZkClient>  clients = new ArrayList<ZkClient>();
        //保存全部服務的列表
        List<WorkServer>  workServers = new ArrayList<WorkServer>();

        try {
            for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模擬建立10個服務器並啓動
                //建立zkClient
                ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
                clients.add(client);
                //建立serverData
                RunningData runningData = new RunningData();
                runningData.setCid(Long.valueOf(i));
                runningData.setName("Client #" + i);
                //建立服務
                WorkServer  workServer = new WorkServer(runningData);
                workServer.setZkClient(client);
                
                workServers.add(workServer);
                workServer.start();
            }

            System.out.println("敲回車鍵退出!\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();

        } finally {

            System.out.println("Shutting down...");

            for ( WorkServer workServer : workServers ) {
                try {
                    workServer.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }               
            }

            for ( ZkClient client : clients ) {
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }               
            }
        }        
    }
}

zookeeper系列(一)zookeeper必知
zookeeper系列(二)實戰master選舉
zookeeper系列(三)實戰數據發佈訂閱
zookeeper系列(四)實戰負載均衡
zookeeper系列(五)實戰分佈式鎖
zookeeper系列(六)實戰分佈式隊列
zookeeper系列(七)實戰分佈式命名服務
zookeeper系列(八)zookeeper運維架構

master選舉

考慮7*24小時向外提供服務的系統,不能有單點故障,因而咱們使用集羣,採用的是Master+Slave。集羣中有一臺主機和多臺備機,由主機向外提供服務,備機監聽主機狀態,一旦主機宕機,備機必需迅速接管主機繼續向外提供服務。在這個過程當中,從備機選出一臺機做爲主機的過程,就是Master選舉。負載均衡

架構圖

clipboard.png

左邊是ZooKeeper集羣,右邊是3臺工做服務器。工做服務器啓動時,會去ZooKeeper的Servers節點下建立臨時節點,並把基本信息寫入臨時節點。這個過程叫服務註冊,系統中的其餘服務能夠經過獲取Servers節點的子節點列表,來了解當前系統哪些服務器可用,這該過程叫作服務發現。接着這些服務器會嘗試建立Master臨時節點,誰建立成功誰就是Master,其餘的兩臺就做爲Slave。全部的Work Server必需關注Master節點的刪除事件。經過監聽Master節點的刪除事件,來了解Master服務器是否宕機(建立臨時節點的服務器一旦宕機,它所建立的臨時節點即會自動刪除)。一旦Master服務器宕機,必需開始新一輪的Master選舉。運維

流程圖

clipboard.png

核心類圖

clipboard.png

WorkServer對應架構圖的WorkServer,是主工做類;
RunningData用來描述WorkServer的基本信息;
LeaderSelectorZkClient做爲調度器來啓動和中止WorkServer;分佈式

代碼實現

/**
 * 工做服務器信息
 */
public class RunningData implements Serializable {

    private static final long serialVersionUID = 4260577459043203630L;
    

    private Long cid;
    private String name;
    public Long getCid() {
        return cid;
    }
    public void setCid(Long cid) {
        this.cid = cid;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

}
/**
 * 工做服務器
 */
public class WorkServer {

    // 記錄服務器狀態
    private volatile boolean running = false;

    private ZkClient zkClient;
    // Master節點對應zookeeper中的節點路徑
    private static final String MASTER_PATH = "/master";
    // 監聽Master節點刪除事件
    private IZkDataListener dataListener;
    // 記錄當前節點的基本信息
    private RunningData serverData;
    // 記錄集羣中Master節點的基本信息
    private RunningData masterData;
    
    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
    private int delayTime = 5;

    public WorkServer(RunningData rd) {
        this.serverData = rd; // 記錄服務器基本信息
        this.dataListener = new IZkDataListener() {

            public void handleDataDeleted(String dataPath) throws Exception {

                //takeMaster();

                if (masterData != null && masterData.getName().equals(serverData.getName())){
                    // 本身就是上一輪的Master服務器,則直接搶
                    takeMaster();
                } else {
                    // 不然,延遲5秒後再搶。主要是應對網絡抖動,給上一輪的Master服務器優先搶佔master的權利,避免沒必要要的數據遷移開銷
                    delayExector.schedule(new Runnable(){
                        public void run(){
                            takeMaster();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }

            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {

            }
        };
    }

    public ZkClient getZkClient() {
        return zkClient;
    }

    public void setZkClient(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    // 啓動服務器
    public void start() throws Exception {
        if (running) {
            throw new Exception("server has startup...");
        }
        running = true;
        // 訂閱Master節點刪除事件
        zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
        // 爭搶Master權利
        takeMaster();

    }

    // 中止服務器
    public void stop() throws Exception {
        if (!running) {
            throw new Exception("server has stoped");
        }
        running = false;
        
        delayExector.shutdown();
        // 取消Master節點事件訂閱
        zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
        // 釋放Master權利
        releaseMaster();

    }

    // 爭搶Master
    private void takeMaster() {
        if (!running)
            return;

        try {
            // 嘗試建立Master臨時節點
            zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
            masterData = serverData;
            System.out.println(serverData.getName()+" is master");

            // 做爲演示,咱們讓服務器每隔5秒釋放一次Master權利
            delayExector.schedule(new Runnable() {            
                public void run() {
                    // TODO Auto-generated method stub
                    if (checkMaster()){
                        releaseMaster();
                    }
                }
            }, 5, TimeUnit.SECONDS);
            
        } catch (ZkNodeExistsException e) { // 已被其餘服務器建立了
            // 讀取Master節點信息
            RunningData runningData = zkClient.readData(MASTER_PATH, true);
            if (runningData == null) {
                takeMaster(); // 沒讀到,讀取瞬間Master節點宕機了,有機會再次爭搶
            } else {
                masterData = runningData;
            }
        } catch (Exception e) {
            // ignore;
        }

    }

    // 釋放Master權利
    private void releaseMaster() {
        if (checkMaster()) {
            zkClient.delete(MASTER_PATH);
        }
    }

    // 檢測本身是否爲Master
    private boolean checkMaster() {
        try {
            RunningData eventData = zkClient.readData(MASTER_PATH);
            masterData = eventData;
            if (masterData.getName().equals(serverData.getName())) {
                return true;
            }
            return false;
        } catch (ZkNoNodeException e) {
            return false; // 節點不存在,本身確定不是Master了
        } catch (ZkInterruptedException e) {
            return checkMaster();
        } catch (ZkException e) {
            return false;
        }
    }

}
/**
 * 調度器
 */
public class LeaderSelectorZkClient {

    //啓動的服務個數
    private static final int        CLIENT_QTY = 10;
    //zookeeper服務器的地址
    private static final String     ZOOKEEPER_SERVER = "192.168.1.105:2181";
    
       
    public static void main(String[] args) throws Exception {
        //保存全部zkClient的列表
        List<ZkClient>  clients = new ArrayList<ZkClient>();
        //保存全部服務的列表
        List<WorkServer>  workServers = new ArrayList<WorkServer>();

        try {
            for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模擬建立10個服務器並啓動
                //建立zkClient
                ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
                clients.add(client);
                //建立serverData
                RunningData runningData = new RunningData();
                runningData.setCid(Long.valueOf(i));
                runningData.setName("Client #" + i);
                //建立服務
                WorkServer  workServer = new WorkServer(runningData);
                workServer.setZkClient(client);
                
                workServers.add(workServer);
                workServer.start();
            }

            System.out.println("敲回車鍵退出!\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();

        } finally {

            System.out.println("Shutting down...");

            for ( WorkServer workServer : workServers ) {
                try {
                    workServer.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }               
            }

            for ( ZkClient client : clients ) {
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }               
            }
        }        
    }
}
相關文章
相關標籤/搜索