Zookeeper客戶端介紹

 客戶端是開發人員使用Zookeeper的主要的途徑,如下內容將對Zookeeper的內部原理進行詳細的學習和講解。ZooKeeper的客戶端主要有一下幾個核心組件組成:服務器

  • Zookeeper:提供客戶端訪問ZooKeeper服務器的API.
  • ClientWatchManager:負責管理客戶端註冊的Watcher.
  • HostProvider:客戶端地址列表管理器。
  • ClientCnxn:客戶端核心線程,其內部包含連個線程及SendThread和EvnentThread。SendThread是一個IO線程主要負責客戶端和服務端之間的網絡通訊;後者是一個事件處理線程,主要負責對服務端時間進行處理。

  客戶端的總體架構以下:網絡

 

實例

  下面使用具體的實例結合源碼來分析Zookeeper源碼建立的過程:以下代碼是一個單例的ZooKeeperSupport能夠用來回去Zookeeper客戶端對象:session

 1 public class ZookeeperSupport {    
 2     private static volatile ZooKeeper zooKeeper = null; // zookeeper鏈接,在初始化zk配置時設置
 3     public static final Integer zooKeeperLock = new Integer(1);
 4     public static boolean isUseZk = true; // 是否使用zk,默認使用,當zk鏈接發生異常時再也不使用
 5     public static final long ZK_CONNECT_TIMEOUT = 1L; //zk鏈接的超時時間設置,單位爲秒
 6     
 7     public static ZooKeeper getZooKeeper() {
 8         // 若是zookeeper爲null 或者鏈接不可用,則從新獲取鏈接,通常狀況下,不會觸發
 9         if (zooKeeper == null || !zooKeeper.getState().isAlive()) {
10             synchronized (zooKeeperLock) {
11                 // 若是發現zk再也不使用,則再也不建立新的zk,直接返回
12                 if (isUseZk) {
13                     if (zooKeeper == null || !zooKeeper.getState().isAlive()) {
14                         try {
15                             zooKeeper = createNewZookeper();
16                         } catch (Exception e) {
17                             Constant.log_cron.error("[initZkConfig] error happen where new zookeeper", e);
18                         }
19                     }
20                 }
21             }
22         }
23         return zooKeeper;
24     }
25     
26     public static void setZooKeeper(ZooKeeper zooKeeper) {
27         ZookeeperSupport.zooKeeper = zooKeeper;
28     }
29     
30     /**
31      * zookeeper啓動時,異步啓動兩個線程,因此new以後並不表明鏈接已經創建,此時若是調用zk的一些方法會拋ConnectionLoss的異常
32      * 爲了不這種狀況,封裝new方法,每次new的時候去等待鏈接已經創建才作後面的步驟
33      * 
34      * @return
35      * @throws Exception
36      */
37     public static ZooKeeper createNewZookeper() throws Exception {
38         CountDownLatch connectedLatch = new CountDownLatch(1);
39         ZooKeeper zooKeeper = new ZooKeeper(ZKConfig.getInstance().getConnectUrl(), ZKConfig.getInstance().getTimeout(), new DefaultWatcher(connectedLatch));    
40         if (States.CONNECTING == zooKeeper.getState()) {
41             boolean ret = connectedLatch.await(ZK_CONNECT_TIMEOUT, TimeUnit.SECONDS);
42             // 若是等待超時了,尚未收到鏈接成功的通知,則說明zk不可用,直接不用zk,並報警
43             if(!ret){
44                 isUseZk = false;
45             }
46         }
47         return zooKeeper;
48     }    
49 }
View Code

  爲了使用Zookeeper服務,必需建立一個Zookeeper類的對象。在建立Zookeeper類的對象時客戶端Session的創建是一個異步的過程,構造方法可能會在回話完成創建完成前當即返回,構造方法中的Watcher就是處理鏈接狀態通知的接口。下面給出了DefaultWatcher實現:架構

 1 public class DefaultWatcher implements Watcher {
 2     private CountDownLatch connectedLatch;
 3     public DefaultWatcher(CountDownLatch connectedLatch) {
 4         this.connectedLatch = connectedLatch;
 5     }
 6     // 監控全部被觸發的事件
 7     @Override
 8     public void process(WatchedEvent event) {
 9         if (connectedLatch != null && event.getState() == KeeperState.SyncConnected) {
10             connectedLatch.countDown();
11         }
12     }
13 }
View Code

 源碼分析

  Zookeeper類一共有9個構造函數,具體參數的意義以下:app

  由上面的實例可知,在建立Zookeeper對象時最終調用了以下的構造函數:dom

 1 能夠看到上面的實例中最終調用了這個構造方法:
 2 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
 3             boolean canBeReadOnly, HostProvider aHostProvider,
 4             ZKClientConfig clientConfig) throws IOException {
 5         if (clientConfig == null) {
 6             clientConfig = new ZKClientConfig();
 7         }
 8         this.clientConfig = clientConfig;
 9         //1.初始化watcherManger
10         watchManager = defaultWatchManager();
11         //2.爲watchManager設置設置默認的Watcher
12         watchManager.defaultWatcher = watcher;
13         //3.解析服務器串
14         ConnectStringParser connectStringParser = new ConnectStringParser(
15                 connectString);
16         hostProvider = aHostProvider;
17         //4.建立ClientCnxn對象,並啓動
18         cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
19                 hostProvider, sessionTimeout, this, watchManager,
20                 getClientCnxnSocket(), canBeReadOnly);
21         cnxn.start();
22 }
View Code

  根據如上源碼可知在初始化Zookeeper對象時主要作了三件事情:異步

  • 初始化ZKWatcherManager
  • 解析服務器串,並初始化hostprovider
  • 初始化並啓動ClientCnxn

1.初始化ZKWatcherManager

  下面針對上面三個步驟注意分析。WatchManager主要負責管理客戶端註冊的Wathcr。首先看看 defaultWatchManager()方法,socket

1  protected ZKWatchManager defaultWatchManager() {
2         return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
3     }
View Code

  該方法建立了一個ZKWatchManager對象, ZKWatchManager實現了ClientWatchManager接口,ClientWatchManager接口只有一個materialize()方法,該方法根據keeperState、eventType和path返回應該被通知的Watcher集合。其聲明以下:ide

public interface ClientWatchManager {
    public Set<Watcher> materialize(Watcher.Event.KeeperState   state,
        Watcher.Event.EventType type, String path);
}
View Code

  接下來看看ZKWatchManager的實現,在ZKWatchManager中包含了五個屬性:函數

1 private final Map<String, Set<Watcher>> dataWatches =new HashMap<String, Set<Watcher>>();
2 private final Map<String, Set<Watcher>> existWatches =new HashMap<String, Set<Watcher>>();
3 private final Map<String, Set<Watcher>> childWatches =new HashMap<String, Set<Watcher>>();
4 private boolean disableAutoWatchReset;//用於禁止在Client重連是在服務端重建watch
5 protected volatile Watcher defaultWatcher;//默認的watcher
View Code 

  在ZKWatchManager中最重要的方法是materialize()方法,下面結合源碼進行分析:

public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type,String clientPath){
            //用於存儲返回結果
            Set<Watcher> result = new HashSet<Watcher>();
            //根據EventType進行不一樣的操做
            switch (type) {
            case None:
                //將defaultWatcher返回
                result.add(defaultWatcher);
                //若是KeeperState不是SyncConnected,而且disableAutoWatchReset爲true返回全部的watcher,並清空
                boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }
                return result;
            //若是EventType是NodeDataChanged或者NodeCreated,將dataWatches和existWatches
            case NodeDataChanged:
            case NodeCreated:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            //若是EventType是NodeChildrenChanged,將childWatches返回
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            //若是EventType是NodeDeleted,將dataWatches返回
            case NodeDeleted:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(existWatches.remove(clientPath), result);
                    }
                }
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default:
                throw new RuntimeException(msg);
            }
            return result;
        }
}
View Code

  在看了ZKWatcherManager代碼以後,那麼產生一個疑問Watcher是在何時添加到ZKWatcherManager中的,以Zookeeper接口中的getData()爲例:

public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx){
     …
     //在此處建立了WatchRegistration對象
     WatchRegistration wcb = null;
     if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
     }

        …
     //調用clientCnxn的queuePacket方法
cnxn.queuePacket(h,newReplyHeader(),request,response,cb,clientPath,serverPath, ctx, wcb);
}
View Code

  從上面能夠看到在getData()方法中中建立了一個DataWatchRegistration對象,接下來再分析一下DataWatchRegistration。DataWatchRegistration繼承了WatchRegistration類,WatchRegistration有一個抽象方法以下:

1 abstract protected Map<String, Set<Watcher>> getWatches(int rc);
View Code

  該方法從ZKWatcherManager中獲取一個合適的Map。除此以外還有個register方法,真正的向ZKWatcherManager中註冊Watcher,其具體代碼以下:

 public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
}
View Code

  如今再看一下DataWatchRegistration中是如何實現getWatches(int rc)方法:

protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.dataWatches;
}
View Code

  在DataWatchRegistration中直接返回了watchManager.dataWatches register()方法在finishPacket會調用。

2.ClinetCnxn的建立

  在Zookeeper的構造函數中,建立並啓動ClientCnxn的代碼以下:

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
View Code 

  在構造方法中調用的getClientCnxnSocket()方法,該方法根據系統配置建立一個ClientCnxnSocket對象,具體代碼以下:

 1 private ClientCnxnSocket getClientCnxnSocket() throws IOException {
 2         String clientCnxnSocketName = getClientConfig().getProperty(
 3                 ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
 4         //默認使用ClientCnxnSocketNIO
 5 if (clientCnxnSocketName == null) {
 6             clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
 7         }
 8         try {
 9             //反射獲取構造函數
10             Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).
11 getDeclaredConstructor(ZKClientConfig.class);
12                 //建立對象
13             ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.
14 newInstance(getClientConfig());
15             return clientCxnSocket;
16         } catch (Exception e) {
17             IOException ioe = new IOException("Couldn't instantiate "
18                     + clientCnxnSocketName);
19             ioe.initCause(e);
20             throw ioe;
21         }
22     }
View Code 

  接下來看一下ClientCnxn的構造方法:

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
      long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        …
        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        …
        //初始化sendThread和EventThread
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();
        this.clientConfig=zooKeeper.getClientConfig();
}
View Code

  關於sendThread和EventThread暫時先不分析,接下來看看ClientCnxn的start()方法,該方法主要用於啓動sendThread線程和eventThread線程。

1 public void start() {
2         sendThread.start();
3         eventThread.start();
4 }
View Code

EventThread

  EventThread:主要用於處理Zookeeper客戶端的各類事件,須要注意的是EventThread是一個守護線程。在EventThread內部主要包含如下幾個屬性: 

1     //保存一個待處理的時間的隊列
2     final LinkedBlockingQueue<Object> waitingEvents =new LinkedBlockingQueue<Object>();
3     private volatile KeeperState sessionState = KeeperState.Disconnected;
4     private volatile boolean wasKilled = false;// 判斷EventThread是否被殺掉
5     private volatile boolean isRunning = false;//判斷EventThread是否還在運行
View Code 

  同時在EventThread內部有幾個方法將不一樣待處理事件添加到waitingEvents,這些方法咱們暫時不作分析。接下來看看EventThread的run()方法:

 1 public void run() {
 2            try {
 3               isRunning = true;
 4               while (true) {
 5                  //從任務隊列中取出待處理任務
 6                  Object event = waitingEvents.take();
 7                  if (event == eventOfDeath) {
 8                     wasKilled = true;
 9                  } else {
10                     //處理事務
11                     processEvent(event);
12                  }
13                  if (wasKilled)
14                     synchronized (waitingEvents) {
15                        if (waitingEvents.isEmpty()) {
16                           isRunning = false;
17                           break;
18                        }
19                     }
20               }
21            } catch (InterruptedException e) {
22 23            }
24 25 }
View Code

  processEvent()方法比較簡單,就是調用相應的對象執行相應的處理。

SendThread 

  SendThread主要負責客戶端與服務器端的IO和心跳消息。SendThread主要包含如下四個屬性:

private long lastPingSentNs;//記錄上一次心跳發送時間
private final ClientCnxnSocket clientCnxnSocket;//在ClientCnxn構造時傳入的
private Random r = new Random(System.nanoTime());        
private boolean isFirstConnect = true;
View Code

  SendThread的構造方法以下: 

SendThread(ClientCnxnSocket clientCnxnSocket) {
    uper(makeThreadName("-SendThread()"));
    state = States.CONNECTING;//將ClientCnxn中state由Not_connected設置爲CONNECTING
    this.clientCnxnSocket = clientCnxnSocket;
    etDaemon(true);//設置爲守護線程
}
View Code 

  接下來看看SendThread的run方法,其中這段代碼比較長先進行逐一分析:  

clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
View Code

  接下來進入While循環,在循環的第一部分判斷socket鏈接是否創建,若是沒有創建就創建鏈接,改代碼主要以下 

if (!clientCnxnSocket.isConnected()) {
    // don't re-establish connection if we are closing
    if (closing) {
      break;
    }
    startConnect();
    lientCnxnSocket.updateLastSendAndHeard();
 }
View Code

  進入startConnect繼續跟蹤,發現startConnect()最終調用了ClientCnxnSocketNIO的connect方法,在connect()方法內部先調用了createSock()方法建立一個Sockect對象,其具體實現以下:

SocketChannel createSock() throws IOException {
        SocketChannel sock;
        sock = SocketChannel.open();
        sock.configureBlocking(false);
        sock.socket().setSoLinger(false, -1);
        sock.socket().setTcpNoDelay(true);
        return sock;
}
View Code

  接下來connect()方法繼續調用registerAndConnect,該方法真正的向服務器端創建鏈接:

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
        boolean immediateConnect = sock.connect(addr);
        if (immediateConnect) {
            sendThread.primeConnection();
        }
}
View Code

  能夠看到在registerAndConnect方法中又調用了SendThread的primeConnection()方法,在primeConnection()方法中主要初始化Session、Watch和權限信息,同時註冊ClientCnxnSocketNIO對讀時間和寫時間的監聽。繼續回到SendThread的run()方法。接下來繼續判斷鏈接狀態,若是是state.isConnected()會進行一系列的操做,其中最重要的是調用sendPing()方法和clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);,再此主要分析一下doTransport()方法,  

 1 void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
 2             throws IOException, InterruptedException {
 3         selector.select(waitTimeOut);
 4         Set<SelectionKey> selected;
 5         synchronized (this) {
 6             selected = selector.selectedKeys();
 7         }
 8         updateNow();
 9         for (SelectionKey k : selected) {
10             SocketChannel sc = ((SocketChannel) k.channel());
11             //若是是鏈接事件
12             if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
13                 if (sc.finishConnect()) {
14                     updateLastSendAndHeard();
15                     updateSocketAddresses();
16                     sendThread.primeConnection();
17                 }
18             }
19             //若是是讀寫事件
20              else f((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
21                 doIO(pendingQueue, cnxn);
22             }
23         }
24         if (sendThread.getZkState().isConnected()) {
25             if (findSendablePacket(outgoingQueue,
26                     sendThread.tunnelAuthInProgress()) != null) {
27                 enableWrite();
28             }
29         }
30         selected.clear();
31 }
View Code

  能夠看到最重要的方法是doIO(),在doIO()方法中主要進行讀寫操做.繼續回到SendThread的run方法,看看run()方法在結束時作什麼工做,在run()方法,跳出while循環時代碼以下

synchronized (state) {
                // When it comes to this point, it guarantees that later queued
                // packet to outgoingQueue will be notified of death.
                cleanup();
            }
            //調用selector.close()
            clientCnxnSocket.close();
            if (state.isAlive()) {
                //添加Disconnected事件
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Disconnected, null));
}
View Code

  在SendThread的run()結束前很重要的一步操做是調用cleanup()方法:

 1 private void cleanup() {
 2             //關閉網絡鏈接
 3 clientCnxnSocket.cleanup();
 4             synchronized (pendingQueue) {
 5                 //遍歷pendingQueue,執行conLossPacket
 6                 for (Packet p : pendingQueue) {
 7                     conLossPacket(p);
 8                 }
 9                 //清除pendingQueue
10                 pendingQueue.clear();
11             }
12             // We can't call outgoingQueue.clear() here because
13             // between iterating and clear up there might be new
14             // packets added in queuePacket().
15             Iterator<Packet> iter = outgoingQueue.iterator();
16             while (iter.hasNext()) {
17                 Packet p = iter.next();
18                 conLossPacket(p);
19                 iter.remove();
20             }
21 }
View Code

  在cleanUp方法中最主要的是循環和遍歷pendingQueue和outgoingQueue,並針對兩個隊列中每個Packet調用conLossPacket(p)方法,最後清空兩個隊列,如今具體看一看conLossPacket(p)中具體作了什麼事情,在conLossPacket(p)主要調用了finishPacket(p),如今進finishPacket(p)方法進行分析:

private void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        //watcher的註冊於取消註冊
        ….
        //判斷是否有異步的回調,若是沒有將finished設置爲true,喚醒全部等待的事件
        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
        //有異步回調,將finished設置爲true,並將packet加入到EventThread的隊列
            p.finished = true;
            eventThread.queuePacket(p);
        }
}
View Code

  至此真個Zookeeper鏈接的創建過程就完成了。

相關文章
相關標籤/搜索