一個最簡單的demo以下:node
public class ZookeeperConstructorSimple implements Watcher{ private static CountDownLatch connectedSemaphone=new CountDownLatch(1); public static void main(String[] args) throws IOException { ZooKeeper zooKeeper=new ZooKeeper("127.0.0.1:2181",5000,new ZookeeperConstructorSimple()); System.out.println(zooKeeper.getState()); try { connectedSemaphone.await(); } catch (Exception e) {} System.out.println("ZooKeeper session established"); System.out.println("sessionId="+zooKeeper.getSessionId()); System.out.println("password="+zooKeeper.getSessionPasswd()); } @Override public void process(WatchedEvent event) { System.out.println("my ZookeeperConstructorSimple watcher Receive watched event:"+event); if(KeeperState.SyncConnected==event.getState()){ connectedSemaphone.countDown(); } } }
使用的maven依賴以下:apache
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
對於目前來講,ZooKeeper的服務器端代碼和客戶端代碼仍是混在一塊兒的,估計往後能改吧。數組
使用的ZooKeeper的構造函數有三個參數構成服務器
ZooKeeper集羣的服務器地址列表session
該地址是能夠填寫多個的,以逗號分隔。如"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",那客戶端鏈接的時候究竟是使用哪個呢?先隨機打亂,而後輪詢着用,後面再詳細介紹。dom
sessionTimeout異步
最終會引出三個時間設置:和服務器端協商後的sessionTimeout、readTimeout、connectTimeoutsocket
服務器端使用協商後的sessionTimeout:即超過該時間後,客戶端沒有向服務器端發送任何請求(正常狀況下客戶端會每隔一段時間發送心跳請求,此時服務器端會重新計算客戶端的超時時間點的),則服務器端認爲session超時,清理數據。此時客戶端的ZooKeeper對象就再也不起做用了,須要再從新new一個新的對象了。maven
客戶端使用connectTimeout、readTimeout分別用於檢測鏈接超時和讀取超時,一旦超時,則該客戶端認爲該服務器不穩定,就會重新鏈接下一個服務器地址。ide
Watcher
做爲ZooKeeper對象一個默認的Watcher,用於接收一些事件通知。如和服務器鏈接成功的通知、斷開鏈接的通知、Session過時的通知等。
同時咱們能夠看到,一旦和ZooKeeper服務器鏈接創建成功,就會獲取服務器端分配的sessionId和password,以下:
sessionId=94249128002584594 password=[B@4de3aaf6
下面就經過源碼來詳細說明這個創建鏈接的過程。
首先與ZooKeeper服務器創建鏈接,有兩層鏈接要創建。
創建TCP鏈接以後,客戶端發送ConnectRequest請求,申請創建session關聯,此時服務器端會爲該客戶端分配sessionId和密碼,同時開啓對該session是否超時的檢測。
當在sessionTimeout時間內,即還未超時,此時TCP鏈接斷開,服務器端仍然認爲該sessionId處於存活狀態。此時,客戶端會選擇下一個ZooKeeper服務器地址進行TCP鏈接創建,TCP鏈接創建完成後,拿着以前的sessionId和密碼發送ConnectRequest請求,若是還未到該sessionId的超時時間,則表示自動重連成功,對客戶端用戶是透明的,一切都在背後默默執行,ZooKeeper對象是有效的。
若是從新創建TCP鏈接後,已經達到該sessionId的超時時間了(服務器端就會清理與該sessionId相關的數據),則返回給客戶端的sessionTimeout時間爲0,sessionid爲0,密碼爲空字節數組。客戶端接收到該數據後,會判斷協商後的sessionTimeout時間是否小於等於0,若是小於等於0,則使用eventThread線程先發出一個KeeperState.Expired事件,通知相應的Watcher,而後結束EventThread線程的循環,開始走向結束。此時ZooKeeper對象就是無效的了,必需要從新new一個新的ZooKeeper對象,分配新的sessionId了。
它是面向用戶的,提供一些操做API。
它又兩個重要的屬性:
如建立某個node操做(同步方式):
ZooKeeper對象負責建立出Request,並交給ClientCnxn來執行,ZooKeeper對象再對返回結果進行處理。
下面來看下異步回調的方式建立node:
同步方式提交一個請求後,開始循環判斷該請求包的狀態是否結束,即處於阻塞狀態,一旦結束則繼續往下走下去,返回結果。異步方式則提交一個請求後,直接返回,對結果的處理邏輯包含在回調函數中。一旦該對該請求包響應完畢,則取出回調函數執行相應的回調方法。
至此簡單瞭解了,ZooKeeper對象主要封裝用戶的請求以及處理響應等操做。用戶請求的執行所有交給ClientCnxn來執行,那咱們就詳細看下ClientCnxn的來源及大致內容。
先看看ClientCnxn是怎麼來的:
第二步:將鏈接字符串信息交給ConnectStringParser進行解析
鏈接字符串好比: "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181/root"
解析結果以下:
獲得兩個數據String chrootPath默認的跟路徑和ArrayList<InetSocketAddress> serverAddresses即多個host和port信息。
第三步:根據上述解析的host和port列表結果,建立一個HostProvider
有了ConnectStringParser的解析結果,爲何還須要一個HostProvider再來包裝下呢?主要是爲未來留下擴展的餘地
來看下HostProvider的詳細接口介紹:
HostProvider主要負責不斷的對外提供可用的ZooKeeper服務器地址,這些服務器地址能夠是從一個url中加載得來或者其餘途徑得來。同時對於不一樣的ZooKeeper客戶端,給出就近的ZooKeeper服務器地址等。
來看下默認的HostProvider實現StaticHostProvider:
有三個屬性,一個就是服務器地址列表(通過以下方式隨機打亂了):
Collections.shuffle(this.serverAddresses)
另外兩個屬性用於標記,下面來具體看下,StaticHostProvider是如何實現不斷的對外提供ZooKeeper服務器地址的:
代碼也很簡單,就是在打亂的服務器地址列表中,不斷地遍歷,到頭以後,在從0開始。
上面的spinDelay是個什麼狀況呢?
正常狀況下,currentIndex先加1,而後返回currentIndex+1的地址,當該地址鏈接成功後會執行onConnected方法,即lastIndex = currentIndex了。然而當返回的currentIndex+1的地址鏈接不成功,繼續嘗試下一個,仍不成功,仍繼續下一個,就會遇到currentIndex=lastIndex的狀況,此時即輪詢了一遍,仍然沒有一個地址可以鏈接上,此時的策略就是先暫停休息休息,而後再繼續。
第四步:爲建立ClientCnxn準備參數並建立ClientCnxn。
首先是經過getClientCnxnSocket()獲取一個ClientCnxnSocket。來看下ClientCnxnSocket是主要作什麼工做的:
A ClientCnxnSocket does the lower level communication with a socket implementation. This code has been moved out of ClientCnxn so that a Netty implementation can be provided as an alternative to the NIO socket code.
專門用於負責socket通訊的,把一些公共部分抽象出來,其餘的留給不一樣的實現者來實現。如能夠選擇默認的ClientCnxnSocketNIO,也可使用netty等。
來看下getClientCnxnSocket()的獲取ClientCnxnSocket的過程:
首先獲取系統參數"zookeeper.clientCnxnSocket",若是沒有的話,使用默認的ClientCnxnSocketNIO,因此咱們能夠經過指定該參數來替換默認的實現。
參數準備好了,ClientCnxn是如何來建立的呢?
首先就是保存一些對象參數,此時的sessionId和sessionPasswd都尚未。而後就是兩個timeout參數:connectTimeout和readTimeout。在ClientCnxn的發送和接收數據的線程中,會不斷的檢測鏈接超時和讀取超時,一旦出現超時,就認爲服務不穩定,須要更換服務器,就會從HostProvider中獲取下一個服務器地址進行鏈接。
最後就是兩個線程,一個事件線程即EventThread,一個發送和接收socket數據的線程即SendThread。
事件線程EventThread呢就是從一個事件隊列中不斷取出事件並進行處理:
看下具體的處理過程,主要分紅兩種狀況,一種就是咱們註冊的watch事件,另外一種就是處理異步回調函數:
能夠看到這裏就是觸發咱們註冊Watch的,還有觸發上文提到的異步回調的狀況的。
明白了EventThread是如何來處理事件的,須要知道這些事件是如何來的:
對外提供了三個方法來添加不一樣類型的事件,如SendThread線程就會調用這三個方法來添加事件。其中對於事件通知,會首先根據ZKWatchManager watchManager來獲取關心該事件的全部Watcher,而後觸發他們。
再來看看SendThread的工做內容:
sendThread = new SendThread(clientCnxnSocket); 把傳遞給ClientCnxn的clientCnxnSocket,再傳遞給SendThread,讓它服務於SendThread。
在SendThread的run方法中,有一個while循環,不斷的作着如下幾件事:
任務1:不斷檢測clientCnxnSocket是否和服務器處於鏈接狀態,若是是未鏈接狀態,則從hostProvider中取出一個服務器地址,使用clientCnxnSocket進行鏈接。
和服務器創建鏈接成功後,開始發送ConnectRequest請求,把該請求放到outgoingQueue請求隊列中,等待被髮送給服務器
任務2:檢測是否超時:當處於鏈接狀態時,檢測是否讀超時,當處於未鏈接狀態時,檢測是否鏈接超時
一旦超時,則拋出SessionTimeoutException,而後看下是如何處理呢?
能夠看到一旦發生超時異常或者其餘異常,都會進行清理,並設置鏈接狀態爲未鏈接,而後發送Disconnected事件。至此又會進入任務1的流程
任務3:不斷的發送ping通知,服務器端每接收到ping請求,就會從當前時間從新計算session過時時間,因此當客戶端按照必定時間間隔不斷的發送ping請求,就能保證客戶端的session不會過時。發送時間間隔以下:
clientCnxnSocket.getIdleSend():是最後一次發送數據包的時間與當前時間的間隔。當readTimeout的時間已通過去一半多了,都沒有發送數據包的話,則執行一次Ping發送。或者過去MAX_SEND_PING_INTERVAL(10s)都尚未發送數據包的話,則執行一次Ping發送。
ping發送的內容只有請求頭OpCode.ping的標示,其餘都爲空。發送ping請求,也是把該請求放到outgoingQueue發送隊列中,等待被執行。
任務4:執行IO操做,即發送請求隊列中的請求和讀取服務器端的響應數據。
首先從outgoingQueue請求隊列中取出第一個請求,而後進行序列化,而後使用socket進行發送。
讀取服務器端數據以下:
分爲兩種:一種是讀取針對ConnectRequest請求的響應,另外一種就是其餘響應,先暫時不說。
先來看看針對ConnectRequest請求的響應:
首先進行反序列化,獲得ConnectResponse對象,咱們就能夠獲取到服務器端給咱們客戶端分配的sessionId和passwd,以及協商後的sessionTimeOut時間。
首選要根據協商後的sessionTimeout時間,從新計算readTimeout和connectTimeout值。而後保留和記錄sessionId和passwd。最後經過EventThread發送一個SyncConnected鏈接成功事件。至此,TCP鏈接和session初始化請求都完成了,客戶端的ZooKeeper對象能夠正常使用了。
至此,咱們便了解客戶端與服務器端創建鏈接的過程。
服務器端狀況分不少種,先暫時說最簡單的單機版。同時也再也不給出服務器端的啓動過程(後面的文章再來詳細說明)。
首先介紹下服務器端的大致概況:
服務器端默認採用NIOServerCnxnFactory來負責socket的處理。每來一個客戶端socket請求,爲該客戶端建立一個NIOServerCnxn。以後與該客戶端的交互,就交給了NIOServerCnxn來處理。對於客戶端的ConnectRequest請求,處理以下:
首先反序列化出ConnectRequest
而後開始協商sessionTimeout時間
即判斷用戶傳遞過來的sessionTimeout時間是否在minSessionTimeout、maxSessionTimeout之間。協商完成以後,根據用戶傳遞過來的sessionId是不是0進行不一樣的處理。客戶端第一次請求,sessionId爲0。當客戶端已經鏈接過一個服務器地址,分配了sessionId,而後若是發生超時等異常,客戶端會去拿着已經分配的sessionId去鏈接下一個服務器地址,此時的sessionId不爲0。 sessionId爲0,則表明着要建立session。sessionId不爲0,則須要對該sessionId進行合法性檢查,以及是否已通過期了的檢查。 咱們先來看看sessionId爲0的狀況: ![建立session](https://static.oschina.net/uploads/img/201508/01065436_4nHs.png "建立session") 大致上分三大步:一、使用sessionTracker根據sessionTimeout時間建立一個新的session 二、根據sessionId建立出密碼 三、提交這個建立session的請求到請求處理器鏈,最終將sessionId和密碼返回給客戶端
下面來分別詳細的說明這三個過程:
SessionTracker是用來建立刪除session,執行session的過時檢查的。
直接看下默認使用的SessionTrackerImpl:
先看下session有哪些屬性:
而後再來看看SessionTracker的幾個數據:
要搞清楚的內容有:1 建立session的過程 2 session過時檢查的過程
先來看看建立session的過程:
代碼很簡單,就是建立一個SessionImpl對象,而後存儲到SessionTracker中,同時開始計算session的超時時間。這裏有一個內容就是sessionId的來歷,咱們能夠看到就是根據nextSessionId來的,而且是不斷自增的。
sessionId是一個客戶端的重要標示,是全局惟一的,先來看看單機版的nextSessionId初始化:
單機版的服務器使用1經過計算來初始化nextSessionId。而集羣版對應的id則分別是每一個機器指定的sid。計算過程以下:
綜上所示保證了sessionId是惟一的,不會出現重複分配的狀況。
搞清楚了sessionId的分配,接下來就要弄清楚如何進行session的過時檢查問題:
咱們先看下,session激活過程是怎麼處理的:
首先獲取這個session數據,而後計算它的超期時間
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
private long roundToInterval(long time) {
// We give a one interval grace period return (time / expirationInterval + 1) * expirationInterval;
}
便是拿當前時間加上這個session的timeout時間,而後對其進行取expirationInterval的整,即始終保持是expirationInterval的正數倍,即每一個session的過時時間點最終都會落在expirationInterval的整數倍上。
若是本來該session的超期時間就大於你所計算出的超期時間,則不作任何處理,不然設置該session的超期時間爲上述計算結果的超期時間。
取出本來該session所在的超期時間,從集合裏面刪除
從新獲取如今超期時間所在的集合,添加進去
綜上所述,session的激活其實就是從新計算下超時時間,最終取expirationInterval的正數倍,而後從以前時間點的集合中移除,而後再添加到新的時間點的集合中去。
至此,session的檢查就方便多了,只須要在expirationInterval整數時間點上取出集合,而後一個個標記爲過時便可。而那些不斷被激活的session,則不斷的從一個時間點的集合中換到下一個時間點的集合中。
SessionTrackerImpl也是一個線程,該線程執行內容就是session的過時檢查,以下所示:
回到建立session的三大步驟:
來看下密碼是如何來產生的:
Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd);
其中superSecret爲常量
static final private long superSecret = 0XB3415C00L;
使用Random的方式來隨機生成字節數組。可是該字節數組,只要參數即sessionId相同,字節數組的內容就相同。即當咱們知道了sessionId,就能夠利用上述方式算出對應的密碼,我感受密碼基本上沒什麼用。
再看下當客戶端帶着sessionId和密碼進行鏈接的時候,這時會進行密碼的檢查:
看了上面的代碼,就再次驗證了密碼沒什麼鳥用,知道了sessionId,就徹底知道了密碼。因此這一塊有待改進吧,應該不能由sessionId徹底決定吧,如再加上當前時間等等,讓客戶端造不出來密碼,同時服務器端存儲加密後的密碼。
本文內容已太多,這裏就先簡單描述下,以後再詳細的講解
若是是成功建立session,則把sessionTimeout、sessionId、passwd傳遞給客戶端。若是沒有成功建立,上述三者的值分別是0,0,new byte[16]
以後客戶端處理該響應的過程,上面已經說了,能夠回頭再看下。
下一篇文章開始講述zooKeeper集羣時,服務器端對用戶的建立鏈接請求的處理。