Zookeeper集羣中server數量老是肯定的,因此集羣中的server交互採用比較可靠的bio長鏈接模型;不一樣於集羣中sever間交互zookeeper客戶端其實數量是未知的,爲了提升zookeeper併發性能,zookeeper客戶端與服務器端交互採用nio模型。下面咱們主要來說講zookeeper的服務器端與客戶端的交互。讀者對nio不瞭解的話不妨抽點時間去了解下,對於一些nio框架如netty,mina再如一些web容器如tomcat,jetty底層都實現一套nio框架,對於實現nio框架模型你們不妨去谷歌百度搜一下Doug Lea的scalable io in Java 這個ppt。java
客戶端web
ClientCnxnSocketNIO是zookeeper的nio通信層的客戶端部分,下面僞代碼示例其核心代碼:數組
ClientCnxnSocketNIO{tomcat
doTransport() {服務器
if (若是以前鏈接沒有立馬連上,則在這裏處理OP_CONNECT事件) {併發
sendThread.primeConnection();框架
} else {異步
doIO工具
}性能
//隊列中有發送的消息, 開啓寫
}
doIO() {
if (sockKey.isReadable()) {
sendThread.readResponse(incomingBuffer);
updateLastHeard();
}
if(sockKey.isWritable()) {
Packetp = outgoingQueue.getFirst() //從發送隊列取
updateLastSend
p.requestHeader.setXid(cnxn.getXid());//設置客戶端的xid
序列化
發送
從發送隊列刪除
加入到pendingQueue隊列
}
}
}
ClientCnxn 是客戶端操做ClientCnxnSocketNIO的工具,維護了發送任務線程SendThread,事件任務線程EventThead, 發送隊列OutgoingQueue以及請求消息的等待隊列PendingQueue。下面以僞代碼來示例其核心代碼
ClientCnxn {
outgoingQueue//待向服務器端發送的隊列, 客戶端提交請求放入這個隊列
pendingQueue //發送之後等待響應的隊列,
submitRequest(){
//client端一個封裝成一個packet
outgoingQueue.add(packet);
selector.wakeup();
packet.wait(); //若是是同步調用wait,應該反饋後會
}
SendThread {
run() {
1.設置clientCnxnSocket 最後發送時間,最後的心跳時間
2. if(!clientCnxnSocket.isConnected()) {
startConnect() //主要工做clientCnxnSocket作
} else {
計算下次ping的時間, 發送心跳
委託給 clientCnxnSocket.doTranspor進行底層的nio傳輸
}
}
primeConnection(){
//構建ConnectRequest
//組合成通信層的Packet對象,添加到發送隊列,對於ConnectRequest其requestHeader爲null
outgoingQueue.addFirst
clientCnxnSocket.enableReadWriteOnly();//確保讀寫事件都監聽
}
readResponse(){
1.先讀響應頭,先對特殊的xid進行處理
2. packet = pendingQueue.remove() //因爲client和server都是單線程處理,多隊列處理,因此認爲全局有序
3. 反序列化響應體response, 並設置到packet上
4.finishPacket 1)同步notifyAll,結束 2)異步加入到event線程的隊列
}
}
EventThread{ //主要支持異步的回調
run() {
}
}
}
你們觀察客戶端操做類Zookeeper裏面的操做類主要分爲兩個參數不帶callback的同步方法和參數帶callback的異步方法。
1. 同步調用方法實現相似Future同步轉異步模式實現
1) Client提交請求對象封裝成packet對象放入OutgoingQueue隊列,並調用packet.wait()阻塞當前線程。
2) 每一個Client都只有一個SendThread線程是線性處理OutgoingQueue中的請求消息的,SendThread線程經過ClientCnxnSocketNIO工具順序從OutgoingQueue隊列中取請求消息發送到服務器端,同時將請求packet加入到PendingQueue中
3) ClientCnxnSocketNIO工具接收處理服務器端響應
4) 從PendingQueue隊列取出對應的packet,並調packet.notifyAll()喚醒阻塞的線程完成同步調用
2. 異步調用的整體流程跟同步相似關鍵區別在於
1) 向OutgoingQueue隊列提交請求後,不會調用packet.wait()阻塞當前線程,主流程繼續執行
2) 同同步調用
3) 同同步調用
4) 從PendingQueue隊列取出對應的packet,並調packet.callback方法完成回調處理
Zookeeper服務器端
NIOServerCnxnFactory工廠類,zookeeperserver用來啓動監聽客戶端鏈接,每當有客戶端請求鏈接進來,NIOServerCnxnFactory都會爲這個連接構建NIOServerCnxn 實例來單獨處理與這個客戶端的交互
NIOServerCnxn封裝了處理讀取客戶端請求數據與及向客戶端響應數據
下面經過僞代碼來實例:
NIOServerCnxnFactory {
configure {
綁定端口
做爲server監聽
註冊selectkey 的鏈接時間
}
run { //起到accept的做用
1. OP_ACCEPT, 將NIOServerCnxn(handler) attach到selectkey以便被讀寫事件使用
2. OP_READ 和 OP_WRITE取出handler NIOServerCnxn,並調doIo
}
}
NIOServerCnxn {
構造器 {
//設置selectkey對read感興趣
}
doIo {
if(k.isReadable()) {
1. 讀前四個字節, 表明請求內容長度,不包括本身的4字節
2. 讀取到字節數組中
3. zkServer.processPacket()或者zkServer.processConnectRequest()
}
if(k.isWritable()) {
1.從outgoingBuffers取ByteBuffer
2.發送bytes
}
}
}