ZooKeeper源碼:ZooKeeper類分析

    ZooKeeper版本:3.4.10。java

一.建立ZooKeeper對象

    ZooKeeper類是ZooKeeper客戶端的實現,用來發送命令給ZooKeeper服務器。服務器

    ZooKeeper中能夠設置Watcher,每一個Watcher在節點狀態發生變化的時候被通知,執行預先註冊的Watcher動做。網絡

    ZooKeeper有三種Watcher列表:session

        (1)DataWatcher異步

        (2)ExistWatcheride

        (3)ChildWatcher.this

protected final ClientCnxn cnxn;// 成員變量cnxn,鏈接服務器,經過cnxn發送命令給服務端
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        ......// 打印log
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);// 從傳入的服務器地址字符串中解析出服務器地址
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());// 提供服務器地址,當服務器發生故障沒法鏈接時,會自動鏈接其它的服務器
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);// 構建和服務器通訊的對象cnxn
        cnxn.start();
    }

    ClientCnxn是客戶端和服務端通訊的底層接口,和ClientCnxnSocket一塊兒工做提供網絡通訊服務。spa

 

二.create操做

    調用create在ZooKeeper中建立一個Node,返回值是成功建立的路徑名稱:線程

    首先看看 create 方法:code

public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath, createMode.isSequential());
        final String serverPath = prependChroot(clientPath);
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);// 設置操做代碼爲create
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        request.setData(data);// 使用輸入參數構造CreateRequest請求
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        if (acl != null && acl.size() == 0) {
            throw new KeeperException.InvalidACLException();
        }
        request.setAcl(acl);
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);// 將請求提交發送給服務器
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (cnxn.chrootPath == null) {
            return response.getPath();// 從返回的CreateResponse中獲取建立成功後的路徑
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }

    在 create 中經過 submitRequest 來提交請求:

public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);// 將CreateRequest轉換成Packet包
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }

    queuePacket 將CreateRequest轉換成Packet包:

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;
        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
        synchronized (outgoingQueue) {
            packet = new Packet(h, r, request, response, watchRegistration);// 將CreateRequest轉換成Packet包
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);// 將發送包放入隊列,等待發送線程發送給服務器
            }
        }
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }

 

三.delete操做

    刪除節點操做,提供同步和異步兩種接口方式:    

public void delete(final String path, int version, VoidCallback cb,
            Object ctx)
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);// 校驗傳入的路徑是否合法
        final String serverPath;
        // maintain semantics even in chroot case
        // specifically - root cannot be deleted
        // I think this makes sense even in chroot case.
        if (clientPath.equals("/")) {
            // a bit of a hack, but delete(/) will never succeed and ensures
            // that the same semantics are maintained
            serverPath = clientPath;
        } else {
            serverPath = prependChroot(clientPath);
        }
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.delete);// 設置操做代碼爲delete
        DeleteRequest request = new DeleteRequest();
        request.setPath(serverPath);// 使用輸入參數構造DeleteRequest請求
        request.setVersion(version);
        cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
                serverPath, ctx, null);// 和create操做同樣,調用queuePacket方法,將DeleteRequest轉換成Packet包
    }

 

四.其餘相似操做

        exists:判斷節點是否存在,異步方式。構造ExistsRequest請求對象,設置操做碼ZooDefs.OpCode.exists;

        getData:獲取節點關聯數據。構造GetDataRequest請求對象,設置操做碼ZooDefs.OpCode.getData;

        setData:設置節點關聯數據。構造SetDataRequest請求對象,設置操做碼ZooDefs.OpCode.setData;

        getChildren:獲取子節點路徑列表。構造GetChildrenRequest請求對象,設置操做碼ZooDefs.OpCode.getChildren。

    看完ZooKeeper類分析,是否是以爲很簡單,都是差很少的套路:構建對應服務器操做的請求對象,打包成Packet,而後等待發送線程把這些發送包發送給服務器。

相關文章
相關標籤/搜索