客戶端是開發人員使用Zookeeper的主要的途徑,如下內容將對Zookeeper的內部原理進行詳細的學習和講解。ZooKeeper的客戶端主要有一下幾個核心組件組成:服務器
客戶端的總體架構以下:網絡
下面使用具體的實例結合源碼來分析Zookeeper源碼建立的過程:以下代碼是一個單例的ZooKeeperSupport能夠用來回去Zookeeper客戶端對象:session
1 public class ZookeeperSupport { 2 private static volatile ZooKeeper zooKeeper = null; // zookeeper鏈接,在初始化zk配置時設置 3 public static final Integer zooKeeperLock = new Integer(1); 4 public static boolean isUseZk = true; // 是否使用zk,默認使用,當zk鏈接發生異常時再也不使用 5 public static final long ZK_CONNECT_TIMEOUT = 1L; //zk鏈接的超時時間設置,單位爲秒 6 7 public static ZooKeeper getZooKeeper() { 8 // 若是zookeeper爲null 或者鏈接不可用,則從新獲取鏈接,通常狀況下,不會觸發 9 if (zooKeeper == null || !zooKeeper.getState().isAlive()) { 10 synchronized (zooKeeperLock) { 11 // 若是發現zk再也不使用,則再也不建立新的zk,直接返回 12 if (isUseZk) { 13 if (zooKeeper == null || !zooKeeper.getState().isAlive()) { 14 try { 15 zooKeeper = createNewZookeper(); 16 } catch (Exception e) { 17 Constant.log_cron.error("[initZkConfig] error happen where new zookeeper", e); 18 } 19 } 20 } 21 } 22 } 23 return zooKeeper; 24 } 25 26 public static void setZooKeeper(ZooKeeper zooKeeper) { 27 ZookeeperSupport.zooKeeper = zooKeeper; 28 } 29 30 /** 31 * zookeeper啓動時,異步啓動兩個線程,因此new以後並不表明鏈接已經創建,此時若是調用zk的一些方法會拋ConnectionLoss的異常 32 * 爲了不這種狀況,封裝new方法,每次new的時候去等待鏈接已經創建才作後面的步驟 33 * 34 * @return 35 * @throws Exception 36 */ 37 public static ZooKeeper createNewZookeper() throws Exception { 38 CountDownLatch connectedLatch = new CountDownLatch(1); 39 ZooKeeper zooKeeper = new ZooKeeper(ZKConfig.getInstance().getConnectUrl(), ZKConfig.getInstance().getTimeout(), new DefaultWatcher(connectedLatch)); 40 if (States.CONNECTING == zooKeeper.getState()) { 41 boolean ret = connectedLatch.await(ZK_CONNECT_TIMEOUT, TimeUnit.SECONDS); 42 // 若是等待超時了,尚未收到鏈接成功的通知,則說明zk不可用,直接不用zk,並報警 43 if(!ret){ 44 isUseZk = false; 45 } 46 } 47 return zooKeeper; 48 } 49 }
爲了使用Zookeeper服務,必需建立一個Zookeeper類的對象。在建立Zookeeper類的對象時客戶端Session的創建是一個異步的過程,構造方法可能會在回話完成創建完成前當即返回,構造方法中的Watcher就是處理鏈接狀態通知的接口。下面給出了DefaultWatcher實現:架構
1 public class DefaultWatcher implements Watcher { 2 private CountDownLatch connectedLatch; 3 public DefaultWatcher(CountDownLatch connectedLatch) { 4 this.connectedLatch = connectedLatch; 5 } 6 // 監控全部被觸發的事件 7 @Override 8 public void process(WatchedEvent event) { 9 if (connectedLatch != null && event.getState() == KeeperState.SyncConnected) { 10 connectedLatch.countDown(); 11 } 12 } 13 }
Zookeeper類一共有9個構造函數,具體參數的意義以下:app
由上面的實例可知,在建立Zookeeper對象時最終調用了以下的構造函數:dom
1 能夠看到上面的實例中最終調用了這個構造方法: 2 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, 3 boolean canBeReadOnly, HostProvider aHostProvider, 4 ZKClientConfig clientConfig) throws IOException { 5 if (clientConfig == null) { 6 clientConfig = new ZKClientConfig(); 7 } 8 this.clientConfig = clientConfig; 9 //1.初始化watcherManger 10 watchManager = defaultWatchManager(); 11 //2.爲watchManager設置設置默認的Watcher 12 watchManager.defaultWatcher = watcher; 13 //3.解析服務器串 14 ConnectStringParser connectStringParser = new ConnectStringParser( 15 connectString); 16 hostProvider = aHostProvider; 17 //4.建立ClientCnxn對象,並啓動 18 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), 19 hostProvider, sessionTimeout, this, watchManager, 20 getClientCnxnSocket(), canBeReadOnly); 21 cnxn.start(); 22 }
根據如上源碼可知在初始化Zookeeper對象時主要作了三件事情:異步
下面針對上面三個步驟注意分析。WatchManager主要負責管理客戶端註冊的Wathcr。首先看看 defaultWatchManager()方法,socket
1 protected ZKWatchManager defaultWatchManager() { 2 return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)); 3 }
該方法建立了一個ZKWatchManager對象, ZKWatchManager實現了ClientWatchManager接口,ClientWatchManager接口只有一個materialize()方法,該方法根據keeperState、eventType和path返回應該被通知的Watcher集合。其聲明以下:ide
public interface ClientWatchManager { public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path); }
接下來看看ZKWatchManager的實現,在ZKWatchManager中包含了五個屬性:函數
1 private final Map<String, Set<Watcher>> dataWatches =new HashMap<String, Set<Watcher>>(); 2 private final Map<String, Set<Watcher>> existWatches =new HashMap<String, Set<Watcher>>(); 3 private final Map<String, Set<Watcher>> childWatches =new HashMap<String, Set<Watcher>>(); 4 private boolean disableAutoWatchReset;//用於禁止在Client重連是在服務端重建watch 5 protected volatile Watcher defaultWatcher;//默認的watcher
在ZKWatchManager中最重要的方法是materialize()方法,下面結合源碼進行分析:
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type,String clientPath){ //用於存儲返回結果 Set<Watcher> result = new HashSet<Watcher>(); //根據EventType進行不一樣的操做 switch (type) { case None: //將defaultWatcher返回 result.add(defaultWatcher); //若是KeeperState不是SyncConnected,而且disableAutoWatchReset爲true返回全部的watcher,並清空 boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected; synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { result.addAll(ws); } if (clear) { dataWatches.clear(); } } synchronized(existWatches) { for(Set<Watcher> ws: existWatches.values()) { result.addAll(ws); } if (clear) { existWatches.clear(); } } synchronized(childWatches) { for(Set<Watcher> ws: childWatches.values()) { result.addAll(ws); } if (clear) { childWatches.clear(); } } return result; //若是EventType是NodeDataChanged或者NodeCreated,將dataWatches和existWatches case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; //若是EventType是NodeChildrenChanged,將childWatches返回 case NodeChildrenChanged: synchronized (childWatches) { addTo(childWatches.remove(clientPath), result); } break; //若是EventType是NodeDeleted,將dataWatches返回 case NodeDeleted: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { Set<Watcher> list = existWatches.remove(clientPath); if (list != null) { addTo(existWatches.remove(clientPath), result); } } synchronized (childWatches) { addTo(childWatches.remove(clientPath), result); } break; default: throw new RuntimeException(msg); } return result; } }
在看了ZKWatcherManager代碼以後,那麼產生一個疑問Watcher是在何時添加到ZKWatcherManager中的,以Zookeeper接口中的getData()爲例:
public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx){ … //在此處建立了WatchRegistration對象 WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } … //調用clientCnxn的queuePacket方法 cnxn.queuePacket(h,newReplyHeader(),request,response,cb,clientPath,serverPath, ctx, wcb); }
從上面能夠看到在getData()方法中中建立了一個DataWatchRegistration對象,接下來再分析一下DataWatchRegistration。DataWatchRegistration繼承了WatchRegistration類,WatchRegistration有一個抽象方法以下:
1 abstract protected Map<String, Set<Watcher>> getWatches(int rc);
該方法從ZKWatcherManager中獲取一個合適的Map。除此以外還有個register方法,真正的向ZKWatcherManager中註冊Watcher,其具體代碼以下:
public void register(int rc) { if (shouldAddWatch(rc)) { Map<String, Set<Watcher>> watches = getWatches(rc); synchronized(watches) { Set<Watcher> watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet<Watcher>(); watches.put(clientPath, watchers); } watchers.add(watcher); } } }
如今再看一下DataWatchRegistration中是如何實現getWatches(int rc)方法:
protected Map<String, Set<Watcher>> getWatches(int rc) { return watchManager.dataWatches; }
在DataWatchRegistration中直接返回了watchManager.dataWatches register()方法在finishPacket會調用。
在Zookeeper的構造函數中,建立並啓動ClientCnxn的代碼以下:
cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start();
在構造方法中調用的getClientCnxnSocket()方法,該方法根據系統配置建立一個ClientCnxnSocket對象,具體代碼以下:
1 private ClientCnxnSocket getClientCnxnSocket() throws IOException { 2 String clientCnxnSocketName = getClientConfig().getProperty( 3 ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET); 4 //默認使用ClientCnxnSocketNIO 5 if (clientCnxnSocketName == null) { 6 clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); 7 } 8 try { 9 //反射獲取構造函數 10 Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName). 11 getDeclaredConstructor(ZKClientConfig.class); 12 //建立對象 13 ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor. 14 newInstance(getClientConfig()); 15 return clientCxnSocket; 16 } catch (Exception e) { 17 IOException ioe = new IOException("Couldn't instantiate " 18 + clientCnxnSocketName); 19 ioe.initCause(e); 20 throw ioe; 21 } 22 }
接下來看一下ClientCnxn的構造方法:
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { … connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; … //初始化sendThread和EventThread sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); this.clientConfig=zooKeeper.getClientConfig(); }
關於sendThread和EventThread暫時先不分析,接下來看看ClientCnxn的start()方法,該方法主要用於啓動sendThread線程和eventThread線程。
1 public void start() { 2 sendThread.start(); 3 eventThread.start(); 4 }
EventThread:主要用於處理Zookeeper客戶端的各類事件,須要注意的是EventThread是一個守護線程。在EventThread內部主要包含如下幾個屬性:
1 //保存一個待處理的時間的隊列 2 final LinkedBlockingQueue<Object> waitingEvents =new LinkedBlockingQueue<Object>(); 3 private volatile KeeperState sessionState = KeeperState.Disconnected; 4 private volatile boolean wasKilled = false;// 判斷EventThread是否被殺掉 5 private volatile boolean isRunning = false;//判斷EventThread是否還在運行
同時在EventThread內部有幾個方法將不一樣待處理事件添加到waitingEvents,這些方法咱們暫時不作分析。接下來看看EventThread的run()方法:
1 public void run() { 2 try { 3 isRunning = true; 4 while (true) { 5 //從任務隊列中取出待處理任務 6 Object event = waitingEvents.take(); 7 if (event == eventOfDeath) { 8 wasKilled = true; 9 } else { 10 //處理事務 11 processEvent(event); 12 } 13 if (wasKilled) 14 synchronized (waitingEvents) { 15 if (waitingEvents.isEmpty()) { 16 isRunning = false; 17 break; 18 } 19 } 20 } 21 } catch (InterruptedException e) { 22 … 23 } 24 … 25 }
processEvent()方法比較簡單,就是調用相應的對象執行相應的處理。
SendThread主要負責客戶端與服務器端的IO和心跳消息。SendThread主要包含如下四個屬性:
private long lastPingSentNs;//記錄上一次心跳發送時間 private final ClientCnxnSocket clientCnxnSocket;//在ClientCnxn構造時傳入的 private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true;
SendThread的構造方法以下:
SendThread(ClientCnxnSocket clientCnxnSocket) { uper(makeThreadName("-SendThread()")); state = States.CONNECTING;//將ClientCnxn中state由Not_connected設置爲CONNECTING this.clientCnxnSocket = clientCnxnSocket; etDaemon(true);//設置爲守護線程 }
接下來看看SendThread的run方法,其中這段代碼比較長先進行逐一分析:
clientCnxnSocket.introduce(this, sessionId, outgoingQueue); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
接下來進入While循環,在循環的第一部分判斷socket鏈接是否創建,若是沒有創建就創建鏈接,改代碼主要以下
if (!clientCnxnSocket.isConnected()) { // don't re-establish connection if we are closing if (closing) { break; } startConnect(); lientCnxnSocket.updateLastSendAndHeard(); }
進入startConnect繼續跟蹤,發現startConnect()最終調用了ClientCnxnSocketNIO的connect方法,在connect()方法內部先調用了createSock()方法建立一個Sockect對象,其具體實現以下:
SocketChannel createSock() throws IOException { SocketChannel sock; sock = SocketChannel.open(); sock.configureBlocking(false); sock.socket().setSoLinger(false, -1); sock.socket().setTcpNoDelay(true); return sock; }
接下來connect()方法繼續調用registerAndConnect,該方法真正的向服務器端創建鏈接:
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { sockKey = sock.register(selector, SelectionKey.OP_CONNECT); boolean immediateConnect = sock.connect(addr); if (immediateConnect) { sendThread.primeConnection(); } }
能夠看到在registerAndConnect方法中又調用了SendThread的primeConnection()方法,在primeConnection()方法中主要初始化Session、Watch和權限信息,同時註冊ClientCnxnSocketNIO對讀時間和寫時間的監聽。繼續回到SendThread的run()方法。接下來繼續判斷鏈接狀態,若是是state.isConnected()會進行一系列的操做,其中最重要的是調用sendPing()方法和clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);,再此主要分析一下doTransport()方法,
1 void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) 2 throws IOException, InterruptedException { 3 selector.select(waitTimeOut); 4 Set<SelectionKey> selected; 5 synchronized (this) { 6 selected = selector.selectedKeys(); 7 } 8 updateNow(); 9 for (SelectionKey k : selected) { 10 SocketChannel sc = ((SocketChannel) k.channel()); 11 //若是是鏈接事件 12 if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { 13 if (sc.finishConnect()) { 14 updateLastSendAndHeard(); 15 updateSocketAddresses(); 16 sendThread.primeConnection(); 17 } 18 } 19 //若是是讀寫事件 20 else f((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { 21 doIO(pendingQueue, cnxn); 22 } 23 } 24 if (sendThread.getZkState().isConnected()) { 25 if (findSendablePacket(outgoingQueue, 26 sendThread.tunnelAuthInProgress()) != null) { 27 enableWrite(); 28 } 29 } 30 selected.clear(); 31 }
能夠看到最重要的方法是doIO(),在doIO()方法中主要進行讀寫操做.繼續回到SendThread的run方法,看看run()方法在結束時作什麼工做,在run()方法,跳出while循環時代碼以下
synchronized (state) { // When it comes to this point, it guarantees that later queued // packet to outgoingQueue will be notified of death. cleanup(); } //調用selector.close() clientCnxnSocket.close(); if (state.isAlive()) { //添加Disconnected事件 eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); }
在SendThread的run()結束前很重要的一步操做是調用cleanup()方法:
1 private void cleanup() { 2 //關閉網絡鏈接 3 clientCnxnSocket.cleanup(); 4 synchronized (pendingQueue) { 5 //遍歷pendingQueue,執行conLossPacket 6 for (Packet p : pendingQueue) { 7 conLossPacket(p); 8 } 9 //清除pendingQueue 10 pendingQueue.clear(); 11 } 12 // We can't call outgoingQueue.clear() here because 13 // between iterating and clear up there might be new 14 // packets added in queuePacket(). 15 Iterator<Packet> iter = outgoingQueue.iterator(); 16 while (iter.hasNext()) { 17 Packet p = iter.next(); 18 conLossPacket(p); 19 iter.remove(); 20 } 21 }
在cleanUp方法中最主要的是循環和遍歷pendingQueue和outgoingQueue,並針對兩個隊列中每個Packet調用conLossPacket(p)方法,最後清空兩個隊列,如今具體看一看conLossPacket(p)中具體作了什麼事情,在conLossPacket(p)主要調用了finishPacket(p),如今進finishPacket(p)方法進行分析:
private void finishPacket(Packet p) { int err = p.replyHeader.getErr(); //watcher的註冊於取消註冊 …. //判斷是否有異步的回調,若是沒有將finished設置爲true,喚醒全部等待的事件 if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { //有異步回調,將finished設置爲true,並將packet加入到EventThread的隊列 p.finished = true; eventThread.queuePacket(p); } }
至此真個Zookeeper鏈接的創建過程就完成了。