ZooKeeper源碼研究系列 客戶端建立鏈接過程分析

1 系列目錄

2 客戶端API簡單使用

2.1 demo案例1

一個最簡單的demo以下:node

public class ZookeeperConstructorSimple implements Watcher{

    private static CountDownLatch connectedSemaphone=new CountDownLatch(1);

    public static void main(String[] args) throws IOException {
        ZooKeeper zooKeeper=new ZooKeeper("127.0.0.1:2181",5000,new ZookeeperConstructorSimple());
        System.out.println(zooKeeper.getState());
        try {
            connectedSemaphone.await();
        } catch (Exception e) {}
        System.out.println("ZooKeeper session established");
        System.out.println("sessionId="+zooKeeper.getSessionId());
        System.out.println("password="+zooKeeper.getSessionPasswd());
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("my ZookeeperConstructorSimple watcher Receive watched event:"+event);
        if(KeeperState.SyncConnected==event.getState()){
            connectedSemaphone.countDown();
        }
    }

}

使用的maven依賴以下:apache

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

對於目前來講,ZooKeeper的服務器端代碼和客戶端代碼仍是混在一塊兒的,估計往後能改吧。數組

使用的ZooKeeper的構造函數有三個參數構成服務器

  • ZooKeeper集羣的服務器地址列表session

    該地址是能夠填寫多個的,以逗號分隔。如"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",那客戶端鏈接的時候究竟是使用哪個呢?先隨機打亂,而後輪詢着用,後面再詳細介紹。dom

  • sessionTimeout異步

    最終會引出三個時間設置:和服務器端協商後的sessionTimeout、readTimeout、connectTimeoutsocket

    服務器端使用協商後的sessionTimeout:即超過該時間後,客戶端沒有向服務器端發送任何請求(正常狀況下客戶端會每隔一段時間發送心跳請求,此時服務器端會重新計算客戶端的超時時間點的),則服務器端認爲session超時,清理數據。此時客戶端的ZooKeeper對象就再也不起做用了,須要再從新new一個新的對象了。maven

    客戶端使用connectTimeout、readTimeout分別用於檢測鏈接超時和讀取超時,一旦超時,則該客戶端認爲該服務器不穩定,就會重新鏈接下一個服務器地址。ide

  • Watcher

    做爲ZooKeeper對象一個默認的Watcher,用於接收一些事件通知。如和服務器鏈接成功的通知、斷開鏈接的通知、Session過時的通知等。

同時咱們能夠看到,一旦和ZooKeeper服務器鏈接創建成功,就會獲取服務器端分配的sessionId和password,以下:

sessionId=94249128002584594
password=[B@4de3aaf6

下面就經過源碼來詳細說明這個創建鏈接的過程。

3 客戶端的創建鏈接的過程

3.1 大致鏈接過程概述

首先與ZooKeeper服務器創建鏈接,有兩層鏈接要創建。

  • 客戶端與服務器端的TCP鏈接
  • 在TCP鏈接的基礎上創建session關聯

創建TCP鏈接以後,客戶端發送ConnectRequest請求,申請創建session關聯,此時服務器端會爲該客戶端分配sessionId和密碼,同時開啓對該session是否超時的檢測。

當在sessionTimeout時間內,即還未超時,此時TCP鏈接斷開,服務器端仍然認爲該sessionId處於存活狀態。此時,客戶端會選擇下一個ZooKeeper服務器地址進行TCP鏈接創建,TCP鏈接創建完成後,拿着以前的sessionId和密碼發送ConnectRequest請求,若是還未到該sessionId的超時時間,則表示自動重連成功,對客戶端用戶是透明的,一切都在背後默默執行,ZooKeeper對象是有效的。

若是從新創建TCP鏈接後,已經達到該sessionId的超時時間了(服務器端就會清理與該sessionId相關的數據),則返回給客戶端的sessionTimeout時間爲0,sessionid爲0,密碼爲空字節數組。客戶端接收到該數據後,會判斷協商後的sessionTimeout時間是否小於等於0,若是小於等於0,則使用eventThread線程先發出一個KeeperState.Expired事件,通知相應的Watcher,而後結束EventThread線程的循環,開始走向結束。此時ZooKeeper對象就是無效的了,必需要從新new一個新的ZooKeeper對象,分配新的sessionId了。

3.2 ZooKeeper對象

它是面向用戶的,提供一些操做API。

它又兩個重要的屬性:

  • ClientCnxn cnxn:負責全部的ZooKeeper節點操做的執行
  • ZKWatchManager watchManager:負責維護某個path上註冊的Watcher

如建立某個node操做(同步方式):

ZooKeeper對象負責建立出Request,並交給ClientCnxn來執行,ZooKeeper對象再對返回結果進行處理。

ZooKeeper同步方式建立節點操做

下面來看下異步回調的方式建立node:

ZooKeeper異步方式建立節點操做

同步方式提交一個請求後,開始循環判斷該請求包的狀態是否結束,即處於阻塞狀態,一旦結束則繼續往下走下去,返回結果。異步方式則提交一個請求後,直接返回,對結果的處理邏輯包含在回調函數中。一旦該對該請求包響應完畢,則取出回調函數執行相應的回調方法。

至此簡單瞭解了,ZooKeeper對象主要封裝用戶的請求以及處理響應等操做。用戶請求的執行所有交給ClientCnxn來執行,那咱們就詳細看下ClientCnxn的來源及大致內容。

先看看ClientCnxn是怎麼來的:

輸入圖片說明

  • 第一步:爲ZKWatchManager watchManager設置一個默認的Watcher
  • 第二步:將鏈接字符串信息交給ConnectStringParser進行解析

    鏈接字符串好比: "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181/root"

    解析結果以下:

    ConnectStringParser解析結果

    獲得兩個數據String chrootPath默認的跟路徑和ArrayList<InetSocketAddress> serverAddresses即多個host和port信息。

  • 第三步:根據上述解析的host和port列表結果,建立一個HostProvider

    有了ConnectStringParser的解析結果,爲何還須要一個HostProvider再來包裝下呢?主要是爲未來留下擴展的餘地

    來看下HostProvider的詳細接口介紹:

    HostProvider的詳細接口介紹

    HostProvider主要負責不斷的對外提供可用的ZooKeeper服務器地址,這些服務器地址能夠是從一個url中加載得來或者其餘途徑得來。同時對於不一樣的ZooKeeper客戶端,給出就近的ZooKeeper服務器地址等。

    來看下默認的HostProvider實現StaticHostProvider:

    StaticHostProvider

    有三個屬性,一個就是服務器地址列表(通過以下方式隨機打亂了):

    Collections.shuffle(this.serverAddresses)

    另外兩個屬性用於標記,下面來具體看下,StaticHostProvider是如何實現不斷的對外提供ZooKeeper服務器地址的:

    StaticHostProvider提供ZooKeeper服務器地址

    代碼也很簡單,就是在打亂的服務器地址列表中,不斷地遍歷,到頭以後,在從0開始。

    上面的spinDelay是個什麼狀況呢?

    正常狀況下,currentIndex先加1,而後返回currentIndex+1的地址,當該地址鏈接成功後會執行onConnected方法,即lastIndex = currentIndex了。然而當返回的currentIndex+1的地址鏈接不成功,繼續嘗試下一個,仍不成功,仍繼續下一個,就會遇到currentIndex=lastIndex的狀況,此時即輪詢了一遍,仍然沒有一個地址可以鏈接上,此時的策略就是先暫停休息休息,而後再繼續。

  • 第四步:爲建立ClientCnxn準備參數並建立ClientCnxn。

    首先是經過getClientCnxnSocket()獲取一個ClientCnxnSocket。來看下ClientCnxnSocket是主要作什麼工做的:

    A ClientCnxnSocket does the lower level communication with a socket implementation. This code has been moved out of ClientCnxn so that a Netty implementation can be provided as an alternative to the NIO socket code.

    專門用於負責socket通訊的,把一些公共部分抽象出來,其餘的留給不一樣的實現者來實現。如能夠選擇默認的ClientCnxnSocketNIO,也可使用netty等。

    來看下getClientCnxnSocket()的獲取ClientCnxnSocket的過程:

    getClientCnxnSocket過程

    首先獲取系統參數"zookeeper.clientCnxnSocket",若是沒有的話,使用默認的ClientCnxnSocketNIO,因此咱們能夠經過指定該參數來替換默認的實現。

    參數準備好了,ClientCnxn是如何來建立的呢?

    ClientCnxn的建立過程

    首先就是保存一些對象參數,此時的sessionId和sessionPasswd都尚未。而後就是兩個timeout參數:connectTimeout和readTimeout。在ClientCnxn的發送和接收數據的線程中,會不斷的檢測鏈接超時和讀取超時,一旦出現超時,就認爲服務不穩定,須要更換服務器,就會從HostProvider中獲取下一個服務器地址進行鏈接。

    最後就是兩個線程,一個事件線程即EventThread,一個發送和接收socket數據的線程即SendThread。

    事件線程EventThread呢就是從一個事件隊列中不斷取出事件並進行處理:

    EventThread的工做職責

    看下具體的處理過程,主要分紅兩種狀況,一種就是咱們註冊的watch事件,另外一種就是處理異步回調函數:

    watch處理和異步回調

    能夠看到這裏就是觸發咱們註冊Watch的,還有觸發上文提到的異步回調的狀況的。

    明白了EventThread是如何來處理事件的,須要知道這些事件是如何來的:

    EventThread添加事件

    對外提供了三個方法來添加不一樣類型的事件,如SendThread線程就會調用這三個方法來添加事件。其中對於事件通知,會首先根據ZKWatchManager watchManager來獲取關心該事件的全部Watcher,而後觸發他們。

    再來看看SendThread的工做內容:

    sendThread = new SendThread(clientCnxnSocket); 把傳遞給ClientCnxn的clientCnxnSocket,再傳遞給SendThread,讓它服務於SendThread。

    在SendThread的run方法中,有一個while循環,不斷的作着如下幾件事:

    • 任務1:不斷檢測clientCnxnSocket是否和服務器處於鏈接狀態,若是是未鏈接狀態,則從hostProvider中取出一個服務器地址,使用clientCnxnSocket進行鏈接。

      和服務器創建鏈接

      和服務器創建鏈接成功後,開始發送ConnectRequest請求,把該請求放到outgoingQueue請求隊列中,等待被髮送給服務器

      創建socket鏈接後發送ConnectRequest請求來初始化session

    • 任務2:檢測是否超時:當處於鏈接狀態時,檢測是否讀超時,當處於未鏈接狀態時,檢測是否鏈接超時

      檢測讀超時或者socket鏈接超時

      一旦超時,則拋出SessionTimeoutException,而後看下是如何處理呢?

      異常處理

      能夠看到一旦發生超時異常或者其餘異常,都會進行清理,並設置鏈接狀態爲未鏈接,而後發送Disconnected事件。至此又會進入任務1的流程

    • 任務3:不斷的發送ping通知,服務器端每接收到ping請求,就會從當前時間從新計算session過時時間,因此當客戶端按照必定時間間隔不斷的發送ping請求,就能保證客戶端的session不會過時。發送時間間隔以下:

      發送Ping通知的機制

      clientCnxnSocket.getIdleSend():是最後一次發送數據包的時間與當前時間的間隔。當readTimeout的時間已通過去一半多了,都沒有發送數據包的話,則執行一次Ping發送。或者過去MAX_SEND_PING_INTERVAL(10s)都尚未發送數據包的話,則執行一次Ping發送。

      Ping發送的內容

      ping發送的內容只有請求頭OpCode.ping的標示,其餘都爲空。發送ping請求,也是把該請求放到outgoingQueue發送隊列中,等待被執行。

    • 任務4:執行IO操做,即發送請求隊列中的請求和讀取服務器端的響應數據。

      發送請求隊列中的請求

      首先從outgoingQueue請求隊列中取出第一個請求,而後進行序列化,而後使用socket進行發送。

      讀取服務器端數據以下:

      讀取服務器端響應

      分爲兩種:一種是讀取針對ConnectRequest請求的響應,另外一種就是其餘響應,先暫時不說。

      先來看看針對ConnectRequest請求的響應:

      讀取ConnectResponse的內容

      首先進行反序列化,獲得ConnectResponse對象,咱們就能夠獲取到服務器端給咱們客戶端分配的sessionId和passwd,以及協商後的sessionTimeOut時間。

      session獲取成功後,重置參數

      首選要根據協商後的sessionTimeout時間,從新計算readTimeout和connectTimeout值。而後保留和記錄sessionId和passwd。最後經過EventThread發送一個SyncConnected鏈接成功事件。至此,TCP鏈接和session初始化請求都完成了,客戶端的ZooKeeper對象能夠正常使用了。

      至此,咱們便了解客戶端與服務器端創建鏈接的過程。

4 服務器端處理鏈接的過程

服務器端狀況分不少種,先暫時說最簡單的單機版。同時也再也不給出服務器端的啓動過程(後面的文章再來詳細說明)。

首先介紹下服務器端的大致概況:

  • 首先是服務器端的配置文件,有tickTime、minSessionTimeout、maxSessionTimeout相關屬性。默認狀況下,tickTime是3000ms,minSessionTimeout是2倍的tickTime,maxSessionTimeout是20倍的tickTime。
  • 服務器端默認採用NIOServerCnxnFactory來負責socket的處理。每來一個客戶端socket請求,爲該客戶端建立一個NIOServerCnxn。以後與該客戶端的交互,就交給了NIOServerCnxn來處理。對於客戶端的ConnectRequest請求,處理以下:

    首先反序列化出ConnectRequest

    反序列化ConnectRequest

    而後開始協商sessionTimeout時間

輸入圖片說明

即判斷用戶傳遞過來的sessionTimeout時間是否在minSessionTimeout、maxSessionTimeout之間。協商完成以後,根據用戶傳遞過來的sessionId是不是0進行不一樣的處理。客戶端第一次請求,sessionId爲0。當客戶端已經鏈接過一個服務器地址,分配了sessionId,而後若是發生超時等異常,客戶端會去拿着已經分配的sessionId去鏈接下一個服務器地址,此時的sessionId不爲0。

sessionId爲0,則表明着要建立session。sessionId不爲0,則須要對該sessionId進行合法性檢查,以及是否已通過期了的檢查。

咱們先來看看sessionId爲0的狀況:

![建立session](https://static.oschina.net/uploads/img/201508/01065436_4nHs.png "建立session")

大致上分三大步:一、使用sessionTracker根據sessionTimeout時間建立一個新的session 二、根據sessionId建立出密碼
三、提交這個建立session的請求到請求處理器鏈,最終將sessionId和密碼返回給客戶端

下面來分別詳細的說明這三個過程:

4.1 使用sessionTracker建立session

SessionTracker是用來建立刪除session,執行session的過時檢查的。

直接看下默認使用的SessionTrackerImpl:

SessionTrackerImpl的總體內容

先看下session有哪些屬性:

  • final long sessionId:session的惟一標示
  • final int timeout:這個session的timeout時間(即上文中客戶端和服務器端商定下來的timeout時間)
  • long tickTime:這個session的下一次超時時間點(隨着客戶端不斷的發送PING請求,就會不斷的刷新該時間,不斷的日後變化)
  • boolean isClosing:session的標示符,用於標示session是否還被正常使用
  • Object owner:建立該session的owner。會在客戶端更換所鏈接的服務器的時候用到(以後詳細說明)

而後再來看看SessionTracker的幾個數據:

  • HashMap<Long, SessionImpl> sessionsById:很簡單,以sessionId存儲session
  • ConcurrentHashMap<Long, Integer> sessionsWithTimeout:以sessionId存儲每一個session的timeout時間
  • HashMap<Long, SessionSet> sessionSets:某個時間點上的session集合(用於session過時檢查)
  • long nextSessionId:初始的sessionId,以後建立的sessionId就在此基礎上自增
  • nextExpirationTime:下一次過時時間點,每當到該時間點就會進行一次session的過時檢查
  • expirationInterval:session過時檢查的週期

要搞清楚的內容有:1 建立session的過程 2 session過時檢查的過程

先來看看建立session的過程:

建立session

代碼很簡單,就是建立一個SessionImpl對象,而後存儲到SessionTracker中,同時開始計算session的超時時間。這裏有一個內容就是sessionId的來歷,咱們能夠看到就是根據nextSessionId來的,而且是不斷自增的。

sessionId是一個客戶端的重要標示,是全局惟一的,先來看看單機版的nextSessionId初始化:

建立SessionTrackerImpl

SessionTrackerImpl構造函數

初始化nextSessionId

單機版的服務器使用1經過計算來初始化nextSessionId。而集羣版對應的id則分別是每一個機器指定的sid。計算過程以下:

初始化nextSessionId說明

  • 第一步:就是取當前時間,爲 10100111011100110110010101110100111100011 爲41爲二進制
  • 第二步:long有64位,左移24位,實際上是除掉了前面的1,後面補了24位的0。
  • 第三步:第二步的結果多是正數也多是負數,目前是正數,以後可能就是負數了,你能夠算一下須要多少年,哈哈。爲了保證右移的時候,進行補0操做,須要使用無符號右移,即>>>。這裏使用了無符號右移8位
  • 第四步:將傳過來的id這裏即1左移56位。而後再與第三步的正數結果進行或操做,獲得最終的基準nextSessionId,因此當這裏的id值不是很大的話,通常幾臺機器而已,也保證了sessionId是一個正數,同時前八位就是機器的sid號。因此每臺機器的的前八位是不一樣的,保證了每臺機器中不會配置相同的sessionId,每臺機器的sessionId又是自增操做,因此單臺機器內sessionId也是不會重複的。

綜上所示保證了sessionId是惟一的,不會出現重複分配的狀況。

搞清楚了sessionId的分配,接下來就要弄清楚如何進行session的過時檢查問題:

咱們先看下,session激活過程是怎麼處理的:

session的激活

  • 首先獲取這個session數據,而後計算它的超期時間

    long expireTime = roundToInterval(System.currentTimeMillis() + timeout);

    private long roundToInterval(long time) {

    // We give a one interval grace period
       return (time / expirationInterval + 1) * expirationInterval;

    }

    便是拿當前時間加上這個session的timeout時間,而後對其進行取expirationInterval的整,即始終保持是expirationInterval的正數倍,即每一個session的過時時間點最終都會落在expirationInterval的整數倍上。

  • 若是本來該session的超期時間就大於你所計算出的超期時間,則不作任何處理,不然設置該session的超期時間爲上述計算結果的超期時間。

  • 取出本來該session所在的超期時間,從集合裏面刪除

  • 從新獲取如今超期時間所在的集合,添加進去

綜上所述,session的激活其實就是從新計算下超時時間,最終取expirationInterval的正數倍,而後從以前時間點的集合中移除,而後再添加到新的時間點的集合中去。

至此,session的檢查就方便多了,只須要在expirationInterval整數時間點上取出集合,而後一個個標記爲過時便可。而那些不斷被激活的session,則不斷的從一個時間點的集合中換到下一個時間點的集合中。

SessionTrackerImpl也是一個線程,該線程執行內容就是session的過時檢查,以下所示:

SessionTrackerImpl線程執行過時檢查

4.2 根據sessionId建立出密碼

回到建立session的三大步驟:

建立session

來看下密碼是如何來產生的:

Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);

其中superSecret爲常量

static final private long superSecret = 0XB3415C00L;

使用Random的方式來隨機生成字節數組。可是該字節數組,只要參數即sessionId相同,字節數組的內容就相同。即當咱們知道了sessionId,就能夠利用上述方式算出對應的密碼,我感受密碼基本上沒什麼用。

再看下當客戶端帶着sessionId和密碼進行鏈接的時候,這時會進行密碼的檢查:

檢查密碼

看了上面的代碼,就再次驗證了密碼沒什麼鳥用,知道了sessionId,就徹底知道了密碼。因此這一塊有待改進吧,應該不能由sessionId徹底決定吧,如再加上當前時間等等,讓客戶端造不出來密碼,同時服務器端存儲加密後的密碼。

4.2 提交這個建立session的請求到請求處理器鏈

本文內容已太多,這裏就先簡單描述下,以後再詳細的講解

輸入圖片說明

若是是成功建立session,則把sessionTimeout、sessionId、passwd傳遞給客戶端。若是沒有成功建立,上述三者的值分別是0,0,new byte[16]

以後客戶端處理該響應的過程,上面已經說了,能夠回頭再看下。

5 結束語

下一篇文章開始講述zooKeeper集羣時,服務器端對用戶的建立鏈接請求的處理。

相關文章
相關標籤/搜索