全部的節點向zk的某個路徑下注冊,建立臨時節點(臨時節點,zookeeper會主動監控,一旦鏈接失效,zk會刪除該臨時節點),每一個註冊者建立時會有一個編號,每次選舉編號最小的爲主節點,其餘節點就爲從節點,從節點會監控主節點是否失效(怎麼監控? zk有事件,監聽事件的狀態變化,而後從新選舉),爲避免「驚羣」現象,每一個節點只監控比它小的一個臨近節點。ide
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * Created by on 17/11/23. */ public class ChooseMaster implements Watcher { private ZooKeeper zk=null; private String selfPath=null; private String waitPath=null; private static final String ZK_ROOT_PATH="/zkmaster"; //選主從的跟路徑 private static final String ZK_SUB_PATH=ZK_ROOT_PATH+"/register"; private CountDownLatch successCountDownLatch=new CountDownLatch(1); private CountDownLatch threadCompleteLatch=null; public ChooseMaster(CountDownLatch countDownLatch){ this.threadCompleteLatch=countDownLatch; } @Override public void process(WatchedEvent watchedEvent) { //監聽事件 Event.KeeperState keeperState=watchedEvent.getState(); Event.EventType eventType=watchedEvent.getType(); if(Event.KeeperState.SyncConnected==keeperState){ //創建鏈接 if(Event.EventType.None==eventType){ System.out.println(Thread.currentThread().getName()+" connected to server"); successCountDownLatch.countDown(); }else if(Event.EventType.NodeDeleted==eventType && watchedEvent.getPath().equals(waitPath)){ //監測到節點刪除,且爲當前線程的等待節點 System.out.println(Thread.currentThread().getName() + " some node was deleted,I'll check if I am the minimum node"); try{ if(checkMinPath()){ //判斷本身是否是最小的編號 processMasterEvent(); //處理主節點作的事情 } }catch (Exception e){ e.printStackTrace(); } } }else if(Event.KeeperState.Disconnected==keeperState){ //鏈接斷開 System.out.println(Thread.currentThread().getName()+ " release connection"); }else if(Event.KeeperState.Expired==keeperState){ //超時 System.out.println(Thread.currentThread().getName()+ " connection expire"); } } public void chooseMaster() throws Exception { selfPath=zk.create(ZK_SUB_PATH,null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //建立臨時節點 System.out.println(Thread.currentThread().getName()+ "create path "+selfPath); if(checkMinPath()){ //判斷是否爲主節點 processMasterEvent(); } } public boolean createPersistPath(String path,String data,boolean needWatch) throws KeeperException, InterruptedException { if(zk.exists(path,needWatch)==null){ zk.create(path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); System.out.println(Thread.currentThread().getName()+" create persist path "+path); } return true; } public void createConnection(String connection,int timeout) throws IOException, InterruptedException { zk=new ZooKeeper(connection,timeout,this); successCountDownLatch.await(); } private void processMasterEvent() throws KeeperException, InterruptedException { if(zk.exists(selfPath,false)==null){ System.out.println(Thread.currentThread().getName()+ " selfnode is not exist "+ selfPath); return; } System.out.println(Thread.currentThread().getName()+ " I'm the master,now do work"); Thread.sleep(2000); System.out.println(Thread.currentThread().getName()+" Finish do work,leave master"); //zk.delete(selfPath,-1); releaseConnection(); threadCompleteLatch.countDown(); } private void releaseConnection() { if(zk!=null){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } private boolean checkMinPath() throws Exception {
//獲取根節點下的全部子節點,進行排序,取當前路徑的index,若是排在第一個,則爲主,不然檢測前一個節點是否存在,不存在則從新選舉最小的節點 List<String> subNodes=zk.getChildren(ZK_ROOT_PATH,false); System.out.println(subNodes.toString()); Collections.sort(subNodes); System.out.println(Thread.currentThread().getName()+" tmp node index is "+selfPath.substring(ZK_ROOT_PATH.length()+1)); int index=subNodes.indexOf(selfPath.substring(ZK_ROOT_PATH.length()+1)); switch (index){ case -1: System.out.println(Thread.currentThread().getName()+" create node is not exist"); return false; case 0: System.out.println(Thread.currentThread().getName()+" I'm the master"); return true; default: waitPath=ZK_ROOT_PATH+"/"+subNodes.get(index-1); System.out.println(Thread.currentThread().getName()+" the node before me is "+waitPath); try{ zk.getData(waitPath,true,new Stat()); return false; }catch (Exception e){ if(zk.exists(waitPath,false)==null){ System.out.println(Thread.currentThread().getName()+" the node before me is not exist,now is me"); return checkMinPath(); }else{ throw e; } } } } }
import org.apache.zookeeper.KeeperException; import java.io.IOException; import java.util.concurrent.*; /** * Created by on 17/11/23. */ public class MasterChoiceTest { private final static String ZK_CONNECT_STRING=""; private final static String ZK_ROOT_PATH="/zkmaster"; private final static int SESSION_TIMEOUT=10000; private static final int THREAD_NUM=5; private static int threadNo=0; private static ExecutorService executorService=null; private static CountDownLatch threadCompleteLatch=new CountDownLatch(THREAD_NUM); public static void main(String[] args){ executorService= Executors.newFixedThreadPool(THREAD_NUM, new ThreadFactory() { @Override public Thread newThread(Runnable r) { String name=String.format("The %s thread",++threadNo); Thread ret=new Thread(Thread.currentThread().getThreadGroup(),r,name,0); ret.setDaemon(false); return ret; } }); if(executorService!=null){ startProcess(); } } private static void startProcess() { Runnable masterChoiceTest=new Runnable() { @Override public void run() { String threadName=Thread.currentThread().getName(); ChooseMaster chooseMaster=new ChooseMaster(threadCompleteLatch); try { chooseMaster.createConnection(ZK_CONNECT_STRING,SESSION_TIMEOUT); System.out.println(Thread.currentThread().getName()+" connected to server"); synchronized (MasterChoiceTest.class){ chooseMaster.createPersistPath(ZK_ROOT_PATH,"thread "+threadName,true); } chooseMaster.chooseMaster(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }; for(int i=0;i<THREAD_NUM;i++){ executorService.execute(masterChoiceTest); } executorService.shutdown(); try { threadCompleteLatch.await(); System.out.println("All thread finished"); } catch (InterruptedException e) { e.printStackTrace(); } } }
The 1 thread-EventThread connected to server
The 5 thread-EventThread connected to server
The 3 thread-EventThread connected to server
The 2 thread-EventThread connected to server
The 5 thread connected to server
The 3 thread connected to server
The 2 thread connected to server
The 1 thread connected to server
The 4 thread-EventThread connected to server
The 4 thread connected to server
The 5 thread create persist path /zkmaster
The 5 threadcreate path /zkmaster/register0000000000
The 5 thread tmp node index is register0000000000
The 5 thread I'm the master
The 1 threadcreate path /zkmaster/register0000000001
The 2 threadcreate path /zkmaster/register0000000002
The 5 thread I'm the master,now do work
The 3 threadcreate path /zkmaster/register0000000003
[register0000000000, register0000000002, register0000000001]
The 1 thread tmp node index is register0000000001
The 1 thread the node before me is /zkmaster/register0000000000
[register0000000000, register0000000002, register0000000001, register0000000003]
[register0000000000, register0000000002, register0000000001, register0000000003]
The 3 thread tmp node index is register0000000003
The 2 thread tmp node index is register0000000002
The 3 thread the node before me is /zkmaster/register0000000002
The 2 thread the node before me is /zkmaster/register0000000001
The 4 threadcreate path /zkmaster/register0000000004
[register0000000000, register0000000002, register0000000001, register0000000004, register0000000003]
The 4 thread tmp node index is register0000000004
The 4 thread the node before me is /zkmaster/register0000000003
The 5 thread Finish do work,leave master
The 1 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000002, register0000000001, register0000000004, register0000000003]
The 1 thread-EventThread tmp node index is register0000000001
The 1 thread-EventThread I'm the master
The 1 thread-EventThread I'm the master,now do work
The 1 thread-EventThread Finish do work,leave master
The 2 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000002, register0000000004, register0000000003]
The 2 thread-EventThread tmp node index is register0000000002
The 2 thread-EventThread I'm the master
The 2 thread-EventThread I'm the master,now do work
The 2 thread-EventThread Finish do work,leave master
The 3 thread-EventThread some node was deleted,I'll check if I am the minimum node
[register0000000004, register0000000003]
The 3 thread-EventThread tmp node index is register0000000003
The 3 thread-EventThread I'm the master
The 3 thread-EventThread I'm the master,now do work
The 3 thread-EventThread Finish do work,leave master
The 4 thread-EventThread some node was deleted,I'll check if I am the minimum node
The 4 thread-EventThread tmp node index is register0000000004
The 4 thread-EventThread I'm the master
The 4 thread-EventThread I'm the master,now do work
The 4 thread-EventThread Finish do work,leave master
All thread finished線程