ZooKeeper版本:3.4.10。java
ZooKeeper類是ZooKeeper客戶端的實現,用來發送命令給ZooKeeper服務器。服務器
ZooKeeper中能夠設置Watcher,每一個Watcher在節點狀態發生變化的時候被通知,執行預先註冊的Watcher動做。網絡
ZooKeeper有三種Watcher列表:session
(1)DataWatcher異步
(2)ExistWatcheride
(3)ChildWatcher.this
protected final ClientCnxn cnxn;// 成員變量cnxn,鏈接服務器,經過cnxn發送命令給服務端 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { ......// 打印log watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString);// 從傳入的服務器地址字符串中解析出服務器地址 HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses());// 提供服務器地址,當服務器發生故障沒法鏈接時,會自動鏈接其它的服務器 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);// 構建和服務器通訊的對象cnxn cnxn.start(); }
ClientCnxn是客戶端和服務端通訊的底層接口,和ClientCnxnSocket一塊兒工做提供網絡通訊服務。spa
調用create在ZooKeeper中建立一個Node,返回值是成功建立的路徑名稱:線程
首先看看 create 方法:code
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create);// 設置操做代碼爲create CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data);// 使用輸入參數構造CreateRequest請求 request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null && acl.size() == 0) { throw new KeeperException.InvalidACLException(); } request.setAcl(acl); ReplyHeader r = cnxn.submitRequest(h, request, response, null);// 將請求提交發送給服務器 if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath();// 從返回的CreateResponse中獲取建立成功後的路徑 } else { return response.getPath().substring(cnxn.chrootPath.length()); } }
在 create 中經過 submitRequest 來提交請求:
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);// 將CreateRequest轉換成Packet包 synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
queuePacket 將CreateRequest轉換成Packet包:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent. synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration);// 將CreateRequest轉換成Packet包 packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet);// 將發送包放入隊列,等待發送線程發送給服務器 } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
刪除節點操做,提供同步和異步兩種接口方式:
public void delete(final String path, int version, VoidCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath);// 校驗傳入的路徑是否合法 final String serverPath; // maintain semantics even in chroot case // specifically - root cannot be deleted // I think this makes sense even in chroot case. if (clientPath.equals("/")) { // a bit of a hack, but delete(/) will never succeed and ensures // that the same semantics are maintained serverPath = clientPath; } else { serverPath = prependChroot(clientPath); } RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.delete);// 設置操做代碼爲delete DeleteRequest request = new DeleteRequest(); request.setPath(serverPath);// 使用輸入參數構造DeleteRequest請求 request.setVersion(version); cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, serverPath, ctx, null);// 和create操做同樣,調用queuePacket方法,將DeleteRequest轉換成Packet包 }
exists:判斷節點是否存在,異步方式。構造ExistsRequest請求對象,設置操做碼ZooDefs.OpCode.exists;
getData:獲取節點關聯數據。構造GetDataRequest請求對象,設置操做碼ZooDefs.OpCode.getData;
setData:設置節點關聯數據。構造SetDataRequest請求對象,設置操做碼ZooDefs.OpCode.setData;
getChildren:獲取子節點路徑列表。構造GetChildrenRequest請求對象,設置操做碼ZooDefs.OpCode.getChildren。