ZooKeeper源碼研究系列(3)單機版服務器介紹

#1 系列目錄node

#2 單機版服務器啓動方式服務器

單機版的服務器啓動,使用ZooKeeperServerMain的main函數來啓動,參數分爲兩種:網絡

  • 只有一個參數:表示爲一個配置文件地址
  • 有2~4個參數:分別表示端口、dataDir、tickTime、maxClientCnxns

詳細介紹見開篇的介紹運行ZooKeepersession

接下來看下啓動的整個過程:socket

輸入圖片說明

  • 第一步:建立一個ZooKeeperServer,表明着一個服務器對象
  • 第二步:根據配置參數dataLogDir和dataDir建立出用於管理事務日誌和快照的對象FileTxnSnapLog
  • 第三步:對ZooKeeperServer設置一些配置參數,如tickTime、minSessionTimeout、maxSessionTimeout
  • 第四步:建立ServerCnxnFactory,用於建立ServerSocket,等待客戶端的socket鏈接
  • 第五步:啓動ZooKeeperServer服務

後面第四部分詳細說明。函數

#3 ZooKeeperServer服務器對象源碼分析

ZooKeeperServer是單機版才使用的服務器對象,集羣版都是使用的是它的子類,來看下繼承類圖.net

ZooKeeperServer類圖

能夠看到,集羣版分別用的是LeaderZooKeeperServer、FollowerZooKeeperServer、ObserverZooKeeperServer。後二者都屬於LearnerZooKeeperServer。線程

##3.1 ZooKeeperServer的重要屬性代理

ZooKeeperServer的重要屬性

  • tickTime:默認3000ms,用於計算默認的minSessionTimeout、maxSessionTimeout。計算方式以下:

    public int getMinSessionTimeout() {
    	return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
    }
    
    public int getMaxSessionTimeout() {
    	return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
    }

    同時還用於指定SessionTrackerImpl的執行過時檢查的週期時間,詳細見說明使用sessionTracker的session過時檢查

  • minSessionTimeout、maxSessionTimeout:用於限制客戶段給出的sessionTimeout時間

  • SessionTracker sessionTracker:負責建立和管理session,同時負責定時進行過時檢查

  • ZKDatabase zkDb:用於存儲ZooKeeper樹形數據的模型

  • FileTxnSnapLog txnLogFactory:負責管理事務日誌和快照日誌文件,能根據它加載出數據到ZKDatabase中,同時能將ZKDatabase中的數據以及session保存到快照日誌文件中。後面會詳細說明FileTxnSnapLog。

    操做以下:

    new ZKDatabase(txnLogFactory)
    
    txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
  • RequestProcessor firstProcessor:ZooKeeperServer請求處理器鏈中的第一個處理器

  • long hzxid:ZooKeeperServer最大的事務編號,每來一個事務請求,都會分配一個事務編號

  • ServerCnxnFactory serverCnxnFactory:負責建立ServerSocket,接受客戶端的socket鏈接

  • ServerStats serverStats:負責統計server的運行狀態

##3.2 ZKDatabase介紹

先來看下ZKDatabase的註釋和屬性

ZKDatabase的註釋和屬性

從註釋中能夠看到ZKDatabase中所包含的信息有:

  • sessions信息,即ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,也就是說僅僅會保存sessionId對應的timeout時間
  • DataTree:即ZooKeeper的內存節點信息
  • LinkedList<Proposal> committedLog:用於保存最近提交的一些事物

來重點看下DataTree的實現:

  • ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();

    維護了path對應的DataNode。每一個DataNode內容以下:

    DataNode內容

    有DataNode parent和Set<String> children,同時byte data[]存儲本節點的數據。StatPersisted stat存儲本節點的狀態信息

  • Map<Long, HashSet<String>> ephemerals =new ConcurrentHashMap<Long, HashSet<String>>()

    維護了每一個session對應的臨時節點的集合

  • WatchManager dataWatches、WatchManager childWatches分別用於管理節點自身數據更新的事件觸發和該節點的全部子節點變更的事件觸發。

    每一個WatchManager的結構以下:

    WatchManager的結構

    watchTable維護着每一個path對應的Watcher。watch2Paths維護着每一個Watcher監控的全部path,即每一個Watcher是能夠監控多個path的。在服務器端Watcher的實現實際上是ServerCnxn,以下:

    public abstract class ServerCnxn implements Stats, Watcher

    而每一個ServerCnxn則表明服務器端爲每一個客戶端分配的handler,負責與客戶端進行通訊。客戶端每次對某個path註冊的Watcher,在傳輸給服務器端的時候僅僅是傳輸一個boolean值,便是否監聽某個path,並無把咱們自定義註冊的Watcher傳輸到服務器端(何況Watcher也不能序列化),而是在本地客戶端進行存儲,存儲着對某個path註冊的Watcher。服務器端接收到該boolean值以後,若是爲true,則把該客戶端對應的ServerCnxn做爲Watcher存儲到上述WatchManager中,即上述WatchManager中存儲的是一個個ServerCnxn實例。一旦服務器端數據變化,觸發對應的ServerCnxn,ServerCnxn而後把該事件又傳遞客戶端,客戶端這時纔會真正引起咱們自定義註冊的Watcher。

    上面只是簡單描述了一下,以後的文章會詳細源碼分析整個過程。

DataTree就負責進行node的增刪改查。

咱們知道node的類型分爲四種類型:

  • PERSISTENT:持久型節點
  • PERSISTENT_SEQUENTIAL:持久型順序型節點
  • EPHEMERAL:臨時型節點
  • EPHEMERAL_SEQUENTIAL:臨時型順序型節點

前二者持久型節點和後二者臨時型節點的不一樣之處就在於,一旦當客戶端session過時,則會清除臨時型節點,不會清除持久型節點,除非去執行刪除操做。

而順序型節點,則是每次建立一個節點,會在一個節點路徑的後面加上父節點的cversion版本號(即該父節點的全部子節點一旦發生變化,就會自增該版本號)的格式化形式,以下:

順序型節點來來歷

能夠看到是將父節點的cversion版本號以10進制形式輸出,寬度是10位,不足的話前面補0。因此是在執行DataTree建立node方法以前就已經定好了path路徑的。

再來看下是如何區分持久型和臨時型節點的呢?

在DataTree建立node方法會傳遞一個ephemeralOwner參數,當客戶端選擇的是持久型節點,給出的sessionId爲0,當爲臨時型節點時,給出客戶端的sessionId,以下:

輸入圖片說明

先來看下DataTree建立node方法的方法:

輸入圖片說明

  • 先判斷父節點存不存在,不存在的話,報錯。
  • 而後檢查父節點的全部子節點是否已存在要建立的節點,若是存在報錯。
  • 建立出節點,並存放到DataTree的ConcurrentHashMap<String, DataNode> nodes屬性中,見上文描述
  • 判斷該節點是不是臨時節點,若是是臨時節點,則ephemeralOwner參數即爲客戶端的sessionId。而後以sessionId爲key,存儲該客戶端所建立的全部臨時節點到DataTree的Map<Long, HashSet<String>> ephemerals屬性中,見上文描述

ZKDatabase先暫時介紹到這裏,以後抽出一篇文章單獨介紹DataTree和FileTxnSnapLog。

##3.3 ZooKeeperServer請求處理器鏈介紹 ZooKeeper使用請求處理器鏈的方式來處理請求,先看下請求處理器的定義RequestProcessor:

RequestProcessor定義

從註釋上能夠看到幾個要點:

  • RequestProcessor是以責任鏈的形式來處理事務的。
  • 請求是被順序的進行處理的,單機版、集羣版的Leader、Follower略有不一樣
  • 對於請求的處理,是經過processRequest(Request request)方法來處理的。有些處理器是一個線程,即請求被扔到該線程中進行處理
  • 當調用shutdown時,也會關閉它所關聯的RequestProcessor

來看下ZooKeeperServer請求處理器鏈的具體狀況:

ZooKeeperServer請求處理器鏈

即PrepRequestProcessor-》SyncRequestProcessor-》FinalRequestProcessor

來一個一個具體看看:

###3.3.1 PrepRequestProcessor處理器

主要內容:對請求進行區分是不是事務請求,若是是事務請求則建立出事務請求頭,同時執行一些檢查操做。

大致屬性以下:

PrepRequestProcessor屬性

  • LinkedBlockingQueue<Request> submittedRequests:提交的用戶請求
  • RequestProcessor nextProcessor:下一個請求處理器
  • ZooKeeperServer zks:服務器對象

PrepRequestProcessor所實現的processRequest接口方法即爲:將該請求放入submittedRequests請求隊列中。同時PrepRequestProcessor又是一個線程,在run方法中又會不斷的取出上述用戶提交的請求,進行處理,整個處理過程以下:

建立事務請求頭

對於增刪改等影響數據狀態的操做都被認爲是事務,須要建立出事務請求頭。

只需驗證session

createSession、closeSession也屬於事務操做,而那些獲取數據的操做則不屬於事務操做,只須要驗證下sessionId是否合法等

處理完成以後就交給了下一個處理器繼續處理該請求。

咱們以建立session和建立節點爲例,來具體看下代碼:

先看下建立session,即以下代碼:

session建立處理

首先會爲該request獲取一個事務id即zxid,該zxid的值來自於ZooKeeper服務器的一個hzxid變量,默認是0,每來一個請求就會執行自增操做。

建立事務請求頭

建立session的具體內容

首先獲取客戶端傳遞過來的sessionTimeout時間,而後使用ZooKeeperServer的sessionTracker來建立一個session,同時爲該session的owner屬性賦值,可是對於建立session的request請求,並無爲owner賦值。而是在建立其餘請求的時候纔會爲請求的owner賦值爲本機器。

代碼見證以下:

建立session的request以下:

輸入圖片說明

建立其餘的request的以下:

輸入圖片說明

接下來看看建立一個node的處理:

  • 首先進行的是session檢查。

    session及owner的檢查

    先檢查服務器端該sessionId仍是否存在,若是不存在則表示已通過期,拋出SessionExpiredException異常。若是session存在,owner爲空,則會對owner進行賦值。若是owner存在則進行owner覈對,若是不一致拋出SessionMovedException異常。

    第一次建立session後,該session的owner是爲空的,以後的請求操做owner都是有值的,此時則會爲該session賦值。

    咱們想象下這樣的場景:客戶端鏈接一臺服務器server1,該客戶端擁有的session的owner是server1,客戶端發送操做請求,因爲網絡緣由形成請求阻塞,客戶端認爲server1不穩定,則會拿着剛纔的session去鏈接另外一臺服務器server2,鏈接成功後,該session對應的owner被設置爲null了(根據上文知道建立session的時候,owner會被清空),而後繼續在server2上執行一樣的操做,此時會爲該session的owner屬性賦值爲server2,若是以前對server1的請求此時終於到達server1了,此request的owner是server1,則在檢查的時候,發現該owner不一致,server1則會拋出SessionMovedException異常。即session的owner已經變化了的異常。則會阻止該請求的執行,防止了重複執行相同的操做。這裏再留個疑問:爲何當建立session的時候要清空owner呢?

  • 反序列化出CreateRequest對象,獲取要建立的路徑,同時獲取父路徑,檢查該父路徑是否存在,若是不存在拋出異常NoNodeException。若是存在獲取父路徑的修改記錄,驗證對父路徑是否有修改權限

  • 從CreateRequest中獲取用戶建立的node的類型,若是是臨時節點的話,則根據父路徑的子節點的版本cversion,來生成該臨時節點的路徑後綴部分。而後驗證該路徑是否存在,若是存在則拋出NodeExistsException異常

  • 判斷父節點是不是臨時節點,若是是臨時節點則不該該有子節點,拋出NoChildrenForEphemeralsException異常,這部分代碼該判斷應該是提早應該作的,而不是留到如今纔來判斷

  • 若是該節點是臨時節點,則爲該節點ephemeralOwner屬性設置爲對應的sessionId,若是是永久節點則設置爲0。而DataTree則是依據ephemeralOwner是否爲0,來判斷是不是臨時節點仍是持久節點,若是是臨時節點,則會另外存儲一份數據,以sessionId爲key,即列出了每一個sessionId所包含的全部臨時節點,一旦該sessionId失效,則直接拿出該列表進行清除操做便可,不用再去遍歷全部的節點了。

  • 產生兩條變化記錄,分別是父節點的子節點列表變化的記錄,和要建立的節點的建立記錄。

至此便完成預處理操做。該交給下一個RequestProcessor處理器來處理了

###3.3.2 SyncRequestProcessor處理器

主要對事務請求進行日誌記錄,同時事務請求達到必定次數後,就會執行一次快照。

主要屬性以下:

SyncRequestProcessor屬性

  • ZooKeeperServer zks:ZooKeeper服務器對象

  • LinkedBlockingQueue<Request> queuedRequests:提交的請求(包括事務請求和非事務請求)

  • RequestProcessor nextProcessor:下一個請求處理器

  • Thread snapInProcess:執行一次快照任務的線程

  • LinkedList<Request> toFlush:那些已經被記錄到日誌文件中但還未被flush到磁盤上的事務請求

  • int snapCount:發生了snapCount次的事務日誌記錄,就會執行一次快照

  • int randRoll:上述是一個對全部服務器都統一的配置數據,爲了不全部的服務器在同一時刻執行快照任務,實際狀況爲發生了(snapCount / 2 + randRoll)次的事務日誌記錄,就會執行一次快照。randRoll的計算方式以下:

    r.nextInt(snapCount/2)

接下來就詳細看下SyncRequestProcessor(也是一個線程)的詳細實現:

對於RequestProcessor定義的接口:processRequest(Request request),SyncRequestProcessor和PrepRequestProcessor同樣,都是講請求放入阻塞式隊列中,而後在線程run方法中執行相應的邏輯操做。

首先仍是從LinkedBlockingQueue<Request> queuedRequests隊列中取出一個Request,處理以下:

SyncRequestProcessor處理過程

  • 第一步:將該請求添加到事務日誌中,這一部分會區分Request是事務請求仍是非事務請求,依據就是前一個處理器PrepRequestProcessor爲Request加上的事務請求頭。若是是事務請求,則添加成功後返回true,添加成功即爲將該請求序列化到一個指定的文件中。若是是非事務請求,直接返回false。
  • 第二步:若是是事務請求,添加到事務日誌中後,logCount++,該logCount就是用於記錄已經執行多少次事務請求序列化到日誌中了。
  • 第三步:一旦logCount超過(snapCount / 2 + randRoll)次後,就須要執行一次快照了。
  • 第四步:先將當前的事務日誌記錄flush到磁盤中,而後設置當前流爲null,以便下一次事務日誌記錄從新開啓一個新的文件來記錄
  • 第五步:建立一個ZooKeeperThread線程,用於執行一次快照任務,則會把當前的dataTree和sessionsWithTimeouts信息序列化到一個文件中。
  • 第六七步:若是是非事務請求的話,則會直接交給下一個RequestProcessor處理器來處理。咱們看到這裏還加上了一個toFlush.isEmpty()的判斷,即以前沒有請求遺留,只有在這樣的條件下才會直接交給下一個RequestProcessor處理器來處理,主要是爲了保證請求的順序性。若是以前還有遺留的請求,則後來的請求不能被先處理。

上述的請求除了直接被下一個處理器處理的狀況,其他大部分都會被保存到LinkedList<Request> toFlush中,何時纔會被執行呢?

toFlush中的request被執行1

toFlush中的request被執行2

兩種狀況下會被執行flush:

  • 當request數量超過1000
  • 當沒有請求到來的時候

來看下具體的flush過程:

flush過程

  • 第一步:執行事務日誌文件執行commit操做。上述rollLog操做僅僅是先flush,而後設置當前日誌記錄流爲null,以便下一次從新開啓一個新的事務日誌文件,同時這些流都會被保存到LinkedList<FileOutputStream> streamsToFlush屬性中,commit操做則是先flush這些全部的流,而後執行這些流的close操做。
  • 第二步:即是將請求交給下一個處理器來處理

至此SyncRequestProcessor的內容也完成了,接下來就是下一個請求處理器即FinalRequestProcessor

###3.3.2 FinalRequestProcessor處理器

做爲處理器鏈上的最後一個處理器,負責執行請求的具體任務,前面幾個處理器都是輔助操做,如PrepRequestProcessor爲請求添加事務請求頭和執行一些檢查工做,SyncRequestProcessor也僅僅是把該請求記錄下來保存到事務日誌中。該請求的具體內容,如獲取全部的子節點,建立node的這些具體的操做就是由FinalRequestProcessor來完成的。

下面就來詳細看看FinalRequestProcessor處理request的過程

FinalRequestProcessor的處理內容

  • 對於request是順序執行,要刪除那些zxid小於當前request的zxid的outstandingChanges、以及outstandingChangesForPath 。這裏就有一個疑問:outstandingChanges數據是由PrepRequestProcessor在預處理事務請求頭的時候產生的,他們又被誰來消費呢?他們主要做用是什麼?

  • 接着就是落實具體的事務操做了,如建立節點、刪除節點、設置數據等

來具體看下這個過程:

ZK執行事務過程

這些事務操做分紅兩種狀況,一部分就是針對dataTree的增刪改節點,另外一種就是建立session,關閉session。建立和關閉session都是使用sessionTracker來完成,這一部分以前已經詳細描述過了。下面具體看下針對dataTree的增刪改節點:

DataTree執行事務操做

根據事務請求頭的不一樣類型,分別執行增刪改操做。對於增長節點上面也已經詳細描述過了。

接下來就是開始準備返回值,而後響應給客戶端。以建立session爲例:

響應session的建立

使用sessionTimeout(客戶端傳遞的sessionTimeout和服務器端協商後的),sessionId,根據sessionId獲取的密碼 這些數據構建一個ConnectResponse,而後進行序列化,傳給客戶端,並開始接收客戶端的請求。

上述是session建立成功的時候。即上圖中的valid爲true。何時爲fasle呢?

當你已經建立session了,可是同服務器的鏈接斷開了,而後拿着該session去從新鏈接下一臺服務器,若是密碼是錯誤的,服務器則會這設置valid爲false。若是密碼是正確的,可是在於服務器端已經創建TCP鏈接後,此時該從新激活session了,可是發現該session已通過期了,被服務器端清除了,也會致使valid爲false。

一旦valid爲false,返回給客戶端的sessionTimeout爲0,sessionId爲0,密碼爲空。客戶端在接收到該數據後,看到sessionTimeout爲0,則認爲創建session關聯失敗,發出session過時的異常事件,開始走向死亡,即客戶端的ZooKeeper對象不可用,必需要從新建立一個新的ZooKeeper對象。

##3.4 ServerStats介紹

它是用於統計服務器的運行數據的。

ServerStats屬性

  • packetsSent:服務器端已發送的數據包
  • packetsReceived:服務器端已接收的數據包
  • maxLatency:處理一次請求的最大延遲
  • minLatency:處理一次請求的最小延遲
  • totalLatency:服務器端處理請求的總延遲
  • count:服務器端已經處理的請求數

ZooKeeperServer會建立一個ServerCnxnFactory,即建立了ServerSocket,等待客戶端鏈接。每來一個客戶端的TCP鏈接,ServerCnxnFactory就會爲該鏈接建立一個ServerCnxn,每一個ServerCnxn也會統計上述信息,即單獨針對某個客戶端的數據。而ZooKeeperServer則是統計全部客戶端的上述數據。

#4 單機版服務器啓動概述

上面描述了ZooKeeper服務器的幾個重要數據,下面就概述下單機版服務器的服務過程:

輸入圖片說明

  • 第一步:建立一個ZooKeeperServer,表明着一個服務器對象,同時會建立出ServerStats用於統計服務器運行數據
  • 第二步:根據配置參數dataLogDir和dataDir建立出用於管理事務日誌和快照的對象FileTxnSnapLog,用於從磁盤上恢復數據和將內存數據快照到磁盤上、事務請求記錄到磁盤上。
  • 第三步:對ZooKeeperServer設置一些配置參數,如tickTime、minSessionTimeout、maxSessionTimeout
  • 第四步:建立ServerCnxnFactory,用於建立ServerSocket,等待客戶端的socket鏈接
  • 第五步:啓動ZooKeeperServer服務

當客戶端第一次TCP鏈接ZooKeeper服務器的時候:

  • 上述ServerCnxnFactory會爲該客戶端建立出一個ServerCnxn服務器代理對象,單獨用於處理和該客戶端的通訊,TCP鏈接創建成功

  • TCP鏈接創建成功後,客戶端開始發送ConnectRequest請求,申請sessionId,會傳遞sessionTimeout時間。

  • ServerCnxn接收到該申請後,根據客戶端傳遞過來的sessionTimeout時間以及ZooKeeperServer自己的minSessionTimeout、maxSessionTimeout參數,肯定最終的sessionTimeout時間

  • sessionTimeout肯定好了,就開始判斷客戶端的請求是否已經含有sessionId,若是含有,則執行sessionId的是否過時、密碼是否正確等檢查。若是沒有sessionId,則會使用sessionTracker分配sessionId,建立一個session。該session含有上述肯定的sessionTimeout信息,即每一個session都有各自的sessionTimeout信息。

  • 以後就是構建一個Request請求,該請求的類型就是建立session。而後將該請求交給請求處理器鏈來處理。請求處理器鏈爲PrepRequestProcessor-》SyncRequestProcessor-》FinalRequestProcessor。

  • 首先是PrepRequestProcessor的處理,它對請求分紅事務請求和非事務請求。事務請求即能改變服務器狀態數據的一些操做,即node的增刪改,session的建立關閉等。爲這些事務請求加上事務請求頭,後面的請求處理器都是基因而否含有事務請求頭來判別該請求是不是事務請求。 同時對事務請求進行一些檢查,如session是否過時,session的owner是否一致、節點是否存在等。對於建立session來講,就檢查了下session是否過時。同時會對全部請求都分配zxid。

  • 接下來是SyncRequestProcessor的處理:僅僅對事務請求進行記錄到事務日誌中,一旦事務請求達到必定數量,就會執行一次對內存DataTree數據和session數據的快照。

  • 最後就是FinalRequestProcessor處理器:真正執行數據操做的地方。如爲DataTree添加節點、刪除節點、更改數據、查詢數據等。同時也負責統計整個服務器端處理過程的最大延遲、最小延遲、總延遲、總處理數,每一個ServerCnxn也針對本身的客戶端也進行響應的統計。最後FinalRequestProcessor處理上述事務操做的結果給客戶端。如建立session,則是返回sessionTimeout、sessionId、密碼數據給客戶端。若是建立失敗,則返回的sessionTimeout爲0,sessionId爲0,密碼爲空。

客戶端對服務器端響應的結果的處理:

  • 首先判斷服務器端返回的sessionTimeout是否小於等於0

    若是是,則認爲申請sessionId失敗,向eventThread中添加了兩個事件。首先是session過時的事件即KeeperState.Expired,客戶端的默認Watcher接收到以後,必須自行採起響應的處理操做(如從新建立一個ZooKeeper對象),由於客戶端的ZooKeeper對象即將失效了。第二個事件就是一個死亡事件,eventThread遇到該事件後,就會跳出事件循環,eventThread線程走向結束。

    若是不是,則認爲申請sessionId成功。則保存服務器端給出的sessionId、密碼、sessionTimeout數據,重置客戶端的readTimeout、connectTimeout。而後經過eventThread發送一個成功鏈接的事件即KeeperState.SyncConnected,客戶端在接收到該事件後就能夠執行相應的操做了。

相關文章
相關標籤/搜索