zookeeper原理解析-客戶端與服務器端交互

Zookeeper集羣中server數量老是肯定的,因此集羣中的server交互採用比較可靠的bio長鏈接模型;不一樣於集羣中sever間交互zookeeper客戶端其實數量是未知的,爲了提升zookeeper併發性能,zookeeper客戶端與服務器端交互採用nio模型。下面咱們主要來說講zookeeper的服務器端與客戶端的交互。讀者對nio不瞭解的話不妨抽點時間去了解下,對於一些nio框架如netty,mina再如一些web容器如tomcat,jetty底層都實現一套nio框架,對於實現nio框架模型你們不妨去谷歌百度搜一下Doug Lea的scalable io in Java 這個ppt。java

 

客戶端web

ClientCnxnSocketNIO是zookeeper的nio通信層的客戶端部分,下面僞代碼示例其核心代碼:數組

ClientCnxnSocketNIO{tomcat

      doTransport() {服務器

               if (若是以前鏈接沒有立馬連上,則在這裏處理OP_CONNECT事件) {併發

                   sendThread.primeConnection();框架

               } else {異步

                    doIO工具

                }性能

 

             //隊列中有發送的消息, 開啓寫

        }

 

       doIO() {

              if (sockKey.isReadable()) {

                    sendThread.readResponse(incomingBuffer);

                    updateLastHeard();

               } 

 

               if(sockKey.isWritable()) {

                      Packetp = outgoingQueue.getFirst() //從發送隊列取

                      updateLastSend

                      p.requestHeader.setXid(cnxn.getXid());//設置客戶端的xid

                     序列化

                     發送

                     從發送隊列刪除

                     加入到pendingQueue隊列

                }

        }

}

 

ClientCnxn 是客戶端操做ClientCnxnSocketNIO的工具,維護了發送任務線程SendThread,事件任務線程EventThead, 發送隊列OutgoingQueue以及請求消息的等待隊列PendingQueue。下面以僞代碼來示例其核心代碼

ClientCnxn {

    outgoingQueue//待向服務器端發送的隊列, 客戶端提交請求放入這個隊列

    pendingQueue //發送之後等待響應的隊列,

    

    submitRequest(){

       //client端一個封裝成一個packet

        outgoingQueue.add(packet);

        selector.wakeup();

        packet.wait(); //若是是同步調用wait,應該反饋後會

    }

 

   SendThread {

       run() {

           1.設置clientCnxnSocket 最後發送時間,最後的心跳時間

           2. if(!clientCnxnSocket.isConnected()) {

                    startConnect()  //主要工做clientCnxnSocket作

              } else {

                    計算下次ping的時間, 發送心跳

                  委託給 clientCnxnSocket.doTranspor進行底層的nio傳輸

               } 

         }

 

        primeConnection(){

           //構建ConnectRequest

           //組合成通信層的Packet對象,添加到發送隊列,對於ConnectRequest其requestHeader爲null

            outgoingQueue.addFirst

            clientCnxnSocket.enableReadWriteOnly();//確保讀寫事件都監聽 

        }

 

        readResponse(){

           1.先讀響應頭,先對特殊的xid進行處理

           2. packet = pendingQueue.remove() //因爲client和server都是單線程處理,多隊列處理,因此認爲全局有序

           3. 反序列化響應體response, 並設置到packet上

           4.finishPacket 1)同步notifyAll,結束 2)異步加入到event線程的隊列

        }

    }

 

    EventThread{ //主要支持異步的回調

       run() {

 

        }

    }

}

 

你們觀察客戶端操做類Zookeeper裏面的操做類主要分爲兩個參數不帶callback的同步方法和參數帶callback的異步方法。

1.      同步調用方法實現相似Future同步轉異步模式實現

1)  Client提交請求對象封裝成packet對象放入OutgoingQueue隊列,並調用packet.wait()阻塞當前線程。

2)  每一個Client都只有一個SendThread線程是線性處理OutgoingQueue中的請求消息的,SendThread線程經過ClientCnxnSocketNIO工具順序從OutgoingQueue隊列中取請求消息發送到服務器端,同時將請求packet加入到PendingQueue中

3)  ClientCnxnSocketNIO工具接收處理服務器端響應

4)  從PendingQueue隊列取出對應的packet,並調packet.notifyAll()喚醒阻塞的線程完成同步調用

2.      異步調用的整體流程跟同步相似關鍵區別在於

1)  向OutgoingQueue隊列提交請求後,不會調用packet.wait()阻塞當前線程,主流程繼續執行

2)  同同步調用

3)  同同步調用

4)  從PendingQueue隊列取出對應的packet,並調packet.callback方法完成回調處理

 

 

 

Zookeeper服務器端

NIOServerCnxnFactory工廠類,zookeeperserver用來啓動監聽客戶端鏈接,每當有客戶端請求鏈接進來,NIOServerCnxnFactory都會爲這個連接構建NIOServerCnxn 實例來單獨處理與這個客戶端的交互

NIOServerCnxn封裝了處理讀取客戶端請求數據與及向客戶端響應數據

下面經過僞代碼來實例:

NIOServerCnxnFactory  {

   configure {

        綁定端口

        做爲server監聽

        註冊selectkey 的鏈接時間

    }

 

    run {  //起到accept的做用

       1. OP_ACCEPT, 將NIOServerCnxn(handler) attach到selectkey以便被讀寫事件使用

       2. OP_READ 和 OP_WRITE取出handler NIOServerCnxn,並調doIo

    }

}

 

 

NIOServerCnxn {

 

    構造器 {

        //設置selectkey對read感興趣

    }

 

    doIo {

        if(k.isReadable()) {

             1. 讀前四個字節, 表明請求內容長度,不包括本身的4字節

             2. 讀取到字節數組中

             3. zkServer.processPacket()或者zkServer.processConnectRequest()

        }

 

        if(k.isWritable()) {

              1.從outgoingBuffers取ByteBuffer

              2.發送bytes

        }

    }

}

相關文章
相關標籤/搜索