筆者爲何要講ZooKeeper的源碼,對於程序員來講,光知道用是成爲不了優秀的行家的,還要知道因此然,只有知道了內部實現機制,才能開拓眼界提升自我。而筆者認爲ZooKeeper是最好的入門分佈式系統的敲門磚。node
好很少說,咱們這裏先看看客戶端是怎麼運轉的。程序員
一、概述算法
ZooKeeper客戶端是鏈接到服務端集羣,獲取zk節點數據,監聽zk節點數據變化的。zk節點就是znode,它是相似文件路徑的東西,每一個znode能夠設置它的文本內容,當znode的文本內容被其餘客戶端修改後,全部監聽該znode的客戶端都會實時被通知到,這樣的方式實現了分佈式一致性存儲。緩存
在客戶端裏有一個ZooKeeper類,注意這裏特指類名稱。客戶端經過Zookeeper類來發送命令給Zookeeper服務器。服務器
ZooKeeper類中還能夠設置Watcher,這就是znode監聽者。Watcher能夠指定監聽哪一個znode,當Zookeeper集羣的znode節點狀態發生變化時,服務端會發送通知消息給客戶端的Watcher。網絡
Watcher又能夠細分爲3種Watcher子類:DataWatcher,ExistWatcher和ChildWatcher。根據字面意思就能猜得出來,DataWatcher是znode的數據變化時觸發,ExistWatcher是znode的建立刪除時觸發,ChildWatcher是在znode下建立子目錄(也是znode)時觸發。實際生產環境中用的最多的仍是DataWatcher。session
下面咱們先分析分析ZooKeeper類的實現,至於Watcher的實現後面會有專門介紹。異步
二、通訊機制分佈式
客戶端與服務端交互的數據流大體以下:ide
三、ZooKeeper類
客戶端在構造函數階段建立ClientCnxn 與服務端鏈接,後續命令都經過ClientCntx發送給服務端。ClientCnxn是客戶端與服務端通訊的底層接口,它和ClientCnxnSocketNetty一塊兒工做提供網絡通訊服務。
服務端是ZooKeeperServer類,收到ClientCnxn的Request請求包後調用相應的處理邏輯,返回結果再經過ClientCnxc發送給客戶端。
ClientCntx鏈接時能夠同時指定多臺服務器地址,根據必定的算法挑選某一個服務器地址進行鏈接,當某個服務器發生故障沒法鏈接時,會自動鏈接其餘的服務器。實現這一機制的是HostProdiver接口,實現用StaticHostProvider類。
ZooKeeper類的構造函數以下:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException { if (clientConfig == null) { clientConfig = new ZKClientConfig(); } this.clientConfig = clientConfig; watchManager = defaultWatchManager(); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser(connectString); hostProvider = aHostProvider; cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
這裏的connectString是鏈接字符串,aHostProvider是管理服務端列表的。watcher是監聽器。
爲何有aHostProvider?客戶端能夠配置多個服務端地址,這樣當某個服務端掛掉的時候,客戶端會自動嘗試鏈接其餘的服務端,實現分佈式可靠性。
建立了ZooKeeper對象後就能夠調用具體的讀寫數據的方法了,下面列舉常見方法的實現機制。
create方法根據輸入參數構造出CreateRequest包,而後經過submitRequest方法傳遞給服務端,submitRequest方法將CreateRequest轉換成Packet包並調用sendPacket方法將發送包放入隊列,等待發送線程發送給服務端。
服務端響應完成後會將返回結果填充到CreateResponse實體中返回給客戶端。
四、發送命令
咱們選取getData方法,來看看客戶端的內部機制,其餘命令的處理過程是相似的,不一樣的只是命令類型不一樣而已。
getData方法從服務端讀取znode的數據,入參同時包括watcher,這樣在znode數據被其餘客戶端修改後,會實時回調watcher來使得全部客戶端同步本次變化。
先給出getData的代碼:
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
這裏幹了幾件事情呢?主要乾了兩件事。
(1)註冊watcher,這個很好理解,至於watcher的細節會在其餘文章裏專欄敘說。
(2)構建完整的znode的路徑名,從根目錄開始。而後將znode的路徑名和GetDataRequest類型打包放到ClientCnxn的發送隊列裏,等待排隊發往服務端。
其餘命令的處理過程是相似的,不一樣的只是命令類型不一樣而已,對應到代碼裏是不一樣的Request對象。getData命令對應GetDataRequest類;Exists方法對應ExistsRequest類。他們的父類倒是同一個。ZooKeeper支持的Request類主要有如下這些:
對於create命令來講,和GetData有一點不一樣。不一樣點在於如下兩點:
(1)create命令是當即返回結果的,而getData等其餘命令是異步返回結果的。getData入參裏的DataCallback參數就是異步回調處理方法。
(2)create是調用ClientCnxn的submitRequest方法啓動發送命令過程,而getData等其餘方法是調用ClientCnxn的queuePacket方法將請求命令緩存在隊列裏,等待發送線程異步發送。
五、ClientCnxn
前面咱們看到ZooKeeper類的命令發送都是經過ClientCnxn類實現的。這裏就談談ClientCnxn類幹了哪些活。
Clientcnxn將客戶端請求加入發送隊列,等待sendThread發送。eventThread負責處理Server返回的WatchedEvent,以及回調註冊的客戶端事件接口處理函數。
5.1 queuePacket
這是ClientCnxn裏最重要的一個方法,它將請求包放入發送隊列outgoingQueue中,等待發送線程發送給服務端。
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration atchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null; packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().packetAdded(); return packet; }
最後告訴SendThread數據已經放好了,至於什麼時候發送就等SendThread本身來決定了。
5.2 submitRequest
提交客戶端請求到服務端,這是當即返回的方法,若是請求包沒處理完則一直等待下去。submitRequest方法主要用在create命令。
ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r;
5.3 sendPacket
sendPacket構建Packet,而後調用發送線程SendThread裏的同名sendPacket方法來發送數據到服務端。
public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) throws IOException { int xid = getXid(); RequestHeader h = new RequestHeader(); h.setXid(xid); h.setType(opCode); ReplyHeader r = new ReplyHeader(); r.setXid(xid); Packet p = new Packet(h, r, request, response, null, false); p.cb = cb; sendThread.sendPacket(p); }
5.4 finishPacket
該方法在接收到服務端的響應時,喚醒等待響應的客戶端線程,經過調用Packet的notifyAll方法來喚醒wait在該Packet上的線程。
若是客戶端請求指定了Watcher,則同時生成WatchedEvent事件並放入事件隊列,等待eventThread線程處理。
代碼片斷:
private void finishPacket(Packet p) { if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
5.5 readResponse
ClientCnxnSocketNetty收到服務端響應後觸發ZKClientHandler的messageReceived事件,在該事件處理邏輯中調用sendThread的readResponse方法獲取服務端響應。
若是服務端響應的是WatchedEvent事件,則將事件放入eventThread中等候調度執行事件方法。
若是服務端響應的是客戶端命令結果,則將Packet從發送隊列刪除,最後調用CientCnxn的finishPacket方法完成最後的處理,finishPacket方法在前面已經講過了。
readResponse的主要代碼以下:
void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; } Packet packet; try { packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } } finally { finishPacket(packet); } }