ZooKeeper系列之(十一):客戶端實現機制

筆者爲何要講ZooKeeper的源碼,對於程序員來講,光知道用是成爲不了優秀的行家的,還要知道因此然,只有知道了內部實現機制,才能開拓眼界提升自我。而筆者認爲ZooKeeper是最好的入門分佈式系統的敲門磚。node

好很少說,咱們這裏先看看客戶端是怎麼運轉的。程序員

一、概述算法

ZooKeeper客戶端是鏈接到服務端集羣,獲取zk節點數據,監聽zk節點數據變化的。zk節點就是znode,它是相似文件路徑的東西,每一個znode能夠設置它的文本內容,當znode的文本內容被其餘客戶端修改後,全部監聽該znode的客戶端都會實時被通知到,這樣的方式實現了分佈式一致性存儲。緩存

在客戶端裏有一個ZooKeeper類,注意這裏特指類名稱。客戶端經過Zookeeper類來發送命令給Zookeeper服務器。服務器

ZooKeeper類中還能夠設置Watcher,這就是znode監聽者。Watcher能夠指定監聽哪一個znode,當Zookeeper集羣的znode節點狀態發生變化時,服務端會發送通知消息給客戶端的Watcher。網絡

Watcher又能夠細分爲3種Watcher子類:DataWatcher,ExistWatcher和ChildWatcher。根據字面意思就能猜得出來,DataWatcher是znode的數據變化時觸發,ExistWatcher是znode的建立刪除時觸發,ChildWatcher是在znode下建立子目錄(也是znode)時觸發。實際生產環境中用的最多的仍是DataWatcher。session

下面咱們先分析分析ZooKeeper類的實現,至於Watcher的實現後面會有專門介紹。異步

二、通訊機制分佈式

客戶端與服務端交互的數據流大體以下:ide

 

  1. 首先是客戶端ZooKeeper類發起命令請求,而後經過ClientCntx發送給服務端集羣。ClientCnxn是上層類,屏蔽了具體的網絡通訊流程,網絡經過是ClientCntxSocketNetty實現的,服務端是ZooKeeperServer。
  2. 以create命令(建立znode)爲例,ZooKeeper類會構造Packet,將請求數據封裝在Packet裏。而後調用ClientCnxn的submitRequest方法。ClientCnxn的submitRequest方法調用queuePacket方法將Packet放入outgoingQueue隊列中,而後線程執行wait方法掛起等待服務端返回。
  3. ClientCnxnSocketNetty和ClientCnxn共享同一個outgoingQueue,ClientCnxnSocketNetty啓動了發送守護進程,當outgoingQueue隊列中有Packet時,會自動將該Packet發送給ZooKeeperServer。同時ClientCnxnSocketNetty啓動接收線程實時接收ZooKeeperServer的返回數據,返回數據觸發ClientCnxnSocketNetty中啓動的ZKClientHandler的MessageReceived事件。
  4. 在MessageReceived事件中回調ClientCnxn中的SendThread類的readResponse方法。
  5. readResponse方法中最後調用finishPacket方法喚醒在該Packet上wait的線程,也就是發起submitRequest的方法,使得submitRequest方法返回到ZooKeeper類。
  6. 客戶端請求過程結束。

三、ZooKeeper類

客戶端在構造函數階段建立ClientCnxn 與服務端鏈接,後續命令都經過ClientCntx發送給服務端。ClientCnxn是客戶端與服務端通訊的底層接口,它和ClientCnxnSocketNetty一塊兒工做提供網絡通訊服務。

服務端是ZooKeeperServer類,收到ClientCnxn的Request請求包後調用相應的處理邏輯,返回結果再經過ClientCnxc發送給客戶端。

ClientCntx鏈接時能夠同時指定多臺服務器地址,根據必定的算法挑選某一個服務器地址進行鏈接,當某個服務器發生故障沒法鏈接時,會自動鏈接其餘的服務器。實現這一機制的是HostProdiver接口,實現用StaticHostProvider類。

ZooKeeper類的構造函數以下:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,  boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        hostProvider = aHostProvider;
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
 }

這裏的connectString是鏈接字符串,aHostProvider是管理服務端列表的。watcher是監聽器。

爲何有aHostProvider?客戶端能夠配置多個服務端地址,這樣當某個服務端掛掉的時候,客戶端會自動嘗試鏈接其餘的服務端,實現分佈式可靠性。

建立了ZooKeeper對象後就能夠調用具體的讀寫數據的方法了,下面列舉常見方法的實現機制。

create方法根據輸入參數構造出CreateRequest包,而後經過submitRequest方法傳遞給服務端,submitRequest方法將CreateRequest轉換成Packet包並調用sendPacket方法將發送包放入隊列,等待發送線程發送給服務端。

服務端響應完成後會將返回結果填充到CreateResponse實體中返回給客戶端。

四、發送命令

咱們選取getData方法,來看看客戶端的內部機制,其餘命令的處理過程是相似的,不一樣的只是命令類型不一樣而已。

getData方法從服務端讀取znode的數據,入參同時包括watcher,這樣在znode數據被其餘客戶端修改後,會實時回調watcher來使得全部客戶端同步本次變化。

先給出getData的代碼:

public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)  {
     final String clientPath = path;
     PathUtils.validatePath(clientPath);
     WatchRegistration wcb = null;
     if (watcher != null) {
         wcb = new DataWatchRegistration(watcher, clientPath);
     }
     final String serverPath = prependChroot(clientPath);
     RequestHeader h = new RequestHeader();
     h.setType(ZooDefs.OpCode.getData);
     GetDataRequest request = new GetDataRequest();
     request.setPath(serverPath);
     request.setWatch(watcher != null);
     GetDataResponse response = new GetDataResponse();
     cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                clientPath, serverPath, ctx, wcb);
}

這裏幹了幾件事情呢?主要乾了兩件事。

(1)註冊watcher,這個很好理解,至於watcher的細節會在其餘文章裏專欄敘說。

(2)構建完整的znode的路徑名,從根目錄開始。而後將znode的路徑名和GetDataRequest類型打包放到ClientCnxn的發送隊列裏,等待排隊發往服務端。

其餘命令的處理過程是相似的,不一樣的只是命令類型不一樣而已,對應到代碼裏是不一樣的Request對象。getData命令對應GetDataRequest類;Exists方法對應ExistsRequest類。他們的父類倒是同一個。ZooKeeper支持的Request類主要有如下這些:

  1. create:CreateRequest
  2. delete:DeleteRequest
  3. exists:ExistsRequest
  4. getData:GetDataRequest
  5. setData:SetDataRequest
  6. getChildren:GetChildrenRequest

對於create命令來講,和GetData有一點不一樣。不一樣點在於如下兩點:

(1)create命令是當即返回結果的,而getData等其餘命令是異步返回結果的。getData入參裏的DataCallback參數就是異步回調處理方法。

(2)create是調用ClientCnxn的submitRequest方法啓動發送命令過程,而getData等其餘方法是調用ClientCnxn的queuePacket方法將請求命令緩存在隊列裏,等待發送線程異步發送。

 

五、ClientCnxn

前面咱們看到ZooKeeper類的命令發送都是經過ClientCnxn類實現的。這裏就談談ClientCnxn類幹了哪些活。

Clientcnxn將客戶端請求加入發送隊列,等待sendThread發送。eventThread負責處理Server返回的WatchedEvent,以及回調註冊的客戶端事件接口處理函數。

5.1 queuePacket

這是ClientCnxn裏最重要的一個方法,它將請求包放入發送隊列outgoingQueue中,等待發送線程發送給服務端。

public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
       Record response, AsyncCallback cb, String clientPath,
       String serverPath, Object ctx, WatchRegistration atchRegistration,
            WatchDeregistration watchDeregistration) {
        Packet packet = null;
        packet = new Packet(h, r, request, response, watchRegistration);        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
       
        synchronized (state) {
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                     if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().packetAdded();
        return packet;
}

最後告訴SendThread數據已經放好了,至於什麼時候發送就等SendThread本身來決定了。

5.2 submitRequest

提交客戶端請求到服務端,這是當即返回的方法,若是請求包沒處理完則一直等待下去。submitRequest方法主要用在create命令。

ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,  null, watchRegistration, watchDeregistration);
synchronized (packet) {
     while (!packet.finished) {
           packet.wait();
     }
}
return r;

5.3 sendPacket

sendPacket構建Packet,而後調用發送線程SendThread裏的同名sendPacket方法來發送數據到服務端。

public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode)   throws IOException {
        int xid = getXid();
        RequestHeader h = new RequestHeader();
        h.setXid(xid);
        h.setType(opCode);
        ReplyHeader r = new ReplyHeader();
        r.setXid(xid);
        Packet p = new Packet(h, r, request, response, null, false);
        p.cb = cb;
        sendThread.sendPacket(p);
 }

5.4 finishPacket

該方法在接收到服務端的響應時,喚醒等待響應的客戶端線程,經過調用Packet的notifyAll方法來喚醒wait在該Packet上的線程。

若是客戶端請求指定了Watcher,則同時生成WatchedEvent事件並放入事件隊列,等待eventThread線程處理。

代碼片斷:

private void finishPacket(Packet p) {
    if (p.cb == null) {
        synchronized (p) {
            p.finished = true;
            p.notifyAll();
        }
     } else {
        p.finished = true;
        eventThread.queuePacket(p);
     }
}

5.5 readResponse

ClientCnxnSocketNetty收到服務端響應後觸發ZKClientHandler的messageReceived事件,在該事件處理邏輯中調用sendThread的readResponse方法獲取服務端響應。

若是服務端響應的是WatchedEvent事件,則將事件放入eventThread中等候調度執行事件方法。

若是服務端響應的是客戶端命令結果,則將Packet從發送隊列刪除,最後調用CientCnxn的finishPacket方法完成最後的處理,finishPacket方法在前面已經講過了。

readResponse的主要代碼以下:

void readResponse(ByteBuffer incomingBuffer) throws IOException {
     ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
     BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
     ReplyHeader replyHdr = new ReplyHeader();
     replyHdr.deserialize(bbia, "header");
     if (replyHdr.getXid() == -1) {
          // -1 means notification
           WatcherEvent event = new WatcherEvent();
           event.deserialize(bbia, "response");
        // convert from a server path to a client path
           if (chrootPath != null) {
                String serverPath = event.getPath();
                if(serverPath.compareTo(chrootPath)==0)
                    event.setPath("/");
                else if (serverPath.length() > chrootPath.length())
             event.setPath(serverPath.substring(chrootPath.length()));               
            }
           WatchedEvent we = new WatchedEvent(event);          
           eventThread.queueEvent( we );
           return;
     }
     Packet packet;
     try {        
           packet.replyHeader.setXid(replyHdr.getXid());
           packet.replyHeader.setErr(replyHdr.getErr());
           packet.replyHeader.setZxid(replyHdr.getZxid());
           if (replyHdr.getZxid() > 0) {
                 lastZxid = replyHdr.getZxid();
            }           
      
      } finally {
            finishPacket(packet);
      }
 }
相關文章
相關標籤/搜索