zookeeper(4) 網絡

  zookeeper底層經過NIO進行網絡傳輸,若是對NIO不是很熟悉,先參見java NIO。下面咱們來逐步實現基於NIO的zookeeper實現,首先咱們要定義應用層的通訊協議。html

傳輸協議java

  請求協議格式:緩存

  字段 長度 說明
  len 4 請求長度,解決半包粘包
請求頭  xid 4 請求在客戶端的id,用於惟一標識和保證響應順序
type 4 請求類型code
請求體 request   不一樣的請求類型,有不一樣的請求體結構

  響應協議格式:服務器

  字段 長度 解釋
  len 4 響應長度,解決半包粘包
響應頭 xid 4 包序號,惟一標識一個包,經過該標識找到對應的客戶端Packet對象
zxid 8 服務端事務處理id
err 4 返回狀態code
響應體 response   不一樣的響應類型有不一樣的響應體

  請求和響應體類型:網絡

List<String> childrensession

請求code 請求頭類型 說明 請求格式 字段說明 響應格式 字段說明
0 notification          
1 create 建立節點 String path 路徑 String path 路徑
byte[] data 節點值
List<ACL> acl acl值
int flags 節點類型
2 delete 刪除節點 String path 路徑 無   
int version 版本
3 exists 是否存在指定節點 String path 路徑  Stat stat 節點狀態  
boolean watch 是否有watch
4 getData 獲取節點數據 String path 路徑 Stat stat 節點狀態  
boolean watch 是否有watch byte[] data 節點數據
5 setData 設置節點數據  String path  路徑  Stat stat 節點狀態  
 byte[] data  數據
int version 版本
6 getACL 獲取節點權限 String path 路徑  List<ACL> acl  權限
Stat stat 節點狀態 
setACL  設置節點權限   String path 路徑  Stat stat 節點狀態   
List<ACL> acl 權限
int version 版本
getChildren  獲取子節點  String path  路徑  List<String> children 子節點名
boolean watch 是否有watch
9 sync 同步路徑 String path 路徑 String path 路徑
11 ping          
12 getChildren2          
100 auth          
101 setWatches          
-10 createSession          
-11 closeSession          
-1 error  

       

 

 

  1.將請求封裝成Packet對象放入outgoingQueue隊列中。併發

  2.有一個收發送線程會從outgoingQueue隊列中取出一個可發送的Packet對象,併發送序列化信息,而後把該Packet放入到pendingQueue隊列中。socket

  3.收發線程會接收服務端響應,反序列號出結果數據,而後在pendingQueue中找到對應的Packet,設置結果,將Packet放入到waitingEvents隊列中。ide

  4.有一個事件線程,會不斷從waitingEvents隊列中取出一個Packet,並調用響應的callback方法。函數

發送packet包

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        
        Packet packet = null;
        synchronized (outgoingQueue) {
            //設置一個全局惟一的id,做爲數據包的id
            if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
                h.setXid(getXid());
            }
            //將請求頭,請求體,返回結果,watcher等封裝成數據包。
            packet = new Packet(h, r, request, response, null,
                    watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            //將數據包添加到outgoing隊列中。
            outgoingQueue.add(packet);
        }
        sendThread.wakeup();
        return packet;
    }
View Code

發送線程主流程(ClientCnxn.SendThread.run):

class SendThread extends Thread {
        SelectionKey sockKey;
        private final Selector selector = Selector.open();
        public void run() {
            while (zooKeeper.state.isAlive()) {
                //創建鏈接
                startConnect();
                //獲取註冊通道
                selector.select(1000);
                Set<SelectionKey> selected;
                synchronized (this) {
                    selected = selector.selectedKeys();
                }
                for (SelectionKey k : selected) {
                    SocketChannel sc = ((SocketChannel) k.channel());
                    if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                        //創建鏈接
                        if (sc.finishConnect()) {
                            primeConnection(k);
                        }
                        //讀寫數據
                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        doIO();
                    }
                }
            }
            try {
                selector.close();
            } catch (IOException e) {
                LOG.warn("Ignoring exception during selector close", e);
            }
        }

      //經過nio創建鏈接
        private void startConnect() throws IOException {
            //從服務器列表中獲取一臺服務器地址
            InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
            nextAddrToTry++;
            if (nextAddrToTry == serverAddrs.size()) {
                nextAddrToTry = 0;
            }
            //經過nio註冊
            SocketChannel sock;
            sock = SocketChannel.open();
            sock.configureBlocking(false);
            sock.socket().setSoLinger(false, -1);
            sock.socket().setTcpNoDelay(true);
            try {
                sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
            } catch (IOException e) {
                sock.close();
                throw e;
            }
            //初始化緩存
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
}
View Code

處理讀寫主流程,主要是nio操做(ClientCnxn.SendThread.doIO):

boolean doIO() throws InterruptedException, IOException {
            boolean packetReceived = false;
            //獲取socketchannel
            SocketChannel sock = (SocketChannel) sockKey.channel();
            //若是可讀
            if (sockKey.isReadable()) {
                //讀取數據到緩存中
                int rc = sock.read(incomingBuffer);
                //直到緩存讀滿
                if (!incomingBuffer.hasRemaining()) {
                    //重置緩存
                    incomingBuffer.flip();
                    //若是讀取的是長度信息,讀取長度信息,而且分配相應緩存
                    if (incomingBuffer == lenBuffer) {
                        int len = incomingBuffer.getInt();
                        incomingBuffer = ByteBuffer.allocate(len);
                    } else if (!initialized) {
                      //若是是connect命令的返回值,獲取session,timeout等相關信息
                        readConnectResult();
                        enableRead();
                        lenBuffer.clear();
                        //重置緩存
                        incomingBuffer = lenBuffer;
                        initialized = true;
                    } else {
                        //讀取數據內容
                        readResponse();
                        //重置緩存
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        packetReceived = true;
                    }
                }
            }
            //若是是寫
            if (sockKey.isWritable()) {
                synchronized (outgoingQueue) {
                    if (!outgoingQueue.isEmpty()) {
                        //從outgoingQueue隊列中拿數據包寫入通道
                        ByteBuffer pbb = outgoingQueue.getFirst().bb;
                        sock.write(pbb);
                        if (!pbb.hasRemaining()) {
                            sentCount++;
                            Packet p = outgoingQueue.removeFirst();
                            if (p.header != null
                                    && p.header.getType() != OpCode.ping
                                    && p.header.getType() != OpCode.auth) {
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                disableWrite();
            } else {
                enableWrite();
            }
            return packetReceived;
        }
View Code

處理返回結果,xid=-2爲ping命令的返回結果;xid=-4爲auth命令;xid=-1爲服務器發送的notification;其餘命令返回結果。

void readResponse() throws IOException {
            //對返回數據進行反序列化
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();
            replyHdr.deserialize(bbia, "header");
            //根據返回頭信息,封裝想要的事件,放入事件隊列中,交給eventthread處理
//向消息隊列放入驗證失敗消息
            if (replyHdr.getXid() == -4) {
                 // -4 is the xid for AuthPacket               
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    zooKeeper.state = States.AUTH_FAILED;                    
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                            Watcher.Event.KeeperState.AuthFailed, null) );                                        
                }
                return;
            }
//
            if (replyHdr.getXid() == -1) {
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else
                        event.setPath(serverPath.substring(chrootPath.length()));
                }
                WatchedEvent we = new WatchedEvent(event);
                eventThread.queueEvent( we );
                return;
            }
//反序列化返回結果
            Packet packet = null;
            synchronized (pendingQueue) {
                packet = pendingQueue.remove();
            }
            try {
                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }
            } finally {
                finishPacket(packet);
            }
        }
View Code
private void finishPacket(Packet p) {
        if (p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }
View Code

事件線程主要是處理回調函數(ClientCnxn.EventThread.run):

public void run() {
           try {
              isRunning = true;
              while (true) {
                  //從隊列中獲取事件
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                     //處理事件
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }
            LOG.info("EventThread shut down");
        }
View Code

處理回調函數和watcher

private void processEvent(Object event) {
          try {
              //處理watcher
              if (event instanceof WatcherSetEventPair) {
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              } else {
                  //
                  Packet p = (Packet) event;
                  int rc = 0;
                  String clientPath = p.clientPath;
                  if (p.replyHeader.getErr() != 0) {
                      rc = p.replyHeader.getErr();
                  }
                  if (p.response instanceof ExistsResponse
                          || p.response instanceof SetDataResponse
                          || p.response instanceof SetACLResponse) {
                      StatCallback cb = (StatCallback) p.cb;
                      if (rc == 0) {
                          if (p.response instanceof ExistsResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((ExistsResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetDataResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetDataResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetACLResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetACLResponse) p.response)
                                              .getStat());
                          }
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetDataResponse) {
                      DataCallback cb = (DataCallback) p.cb;
                      GetDataResponse rsp = (GetDataResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getData(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetACLResponse) {
                      ACLCallback cb = (ACLCallback) p.cb;
                      GetACLResponse rsp = (GetACLResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getAcl(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetChildrenResponse) {
                      ChildrenCallback cb = (ChildrenCallback) p.cb;
                      GetChildrenResponse rsp = (GetChildrenResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetChildren2Response) {
                      Children2Callback cb = (Children2Callback) p.cb;
                      GetChildren2Response rsp = (GetChildren2Response) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null, null);
                      }
                  } else if (p.response instanceof CreateResponse) {
                      StringCallback cb = (StringCallback) p.cb;
                      CreateResponse rsp = (CreateResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx,
                                  (chrootPath == null
                                          ? rsp.getPath()
                                          : rsp.getPath()
                                    .substring(chrootPath.length())));
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.cb instanceof VoidCallback) {
                      VoidCallback cb = (VoidCallback) p.cb;
                      cb.processResult(rc, clientPath, p.ctx);
                  }
              }
          } catch (Throwable t) {
              LOG.error("Caught unexpected throwable", t);
          }
       }
View Code

Packet結構

  包,ClientCnxn內部管理請求內容的模塊。由如下幾個模塊組成:

  1.RequestHeader header 請求頭

  2.Record request 請求內容

  3.ByteBuffer bb 實際須要發送的請求內容。

  4.ReplyHeader replyHeader 響應頭

  5.Record response 響應內容

  6.String clientPath

  7.String serverPath 

  8.boolean finished

  9.AsyncCallback cb

  10.Object ctx 

  11.WatchRegistration watchRegistration 

Packet(RequestHeader header, ReplyHeader replyHeader, Record record,
                Record response, ByteBuffer bb,
                WatchRegistration watchRegistration) {
            this.header = header;
            this.replyHeader = replyHeader;
            this.request = record;
            this.response = response;
            if (bb != null) {
                this.bb = bb;
            } else {
                try {
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    BinaryOutputArchive boa = BinaryOutputArchive
                            .getArchive(baos);
                    boa.writeInt(-1, "len"); // We'll fill this in later
                    header.serialize(boa, "header");
                    if (record != null) {
                        record.serialize(boa, "request");
                    }
                    baos.close();
                    this.bb = ByteBuffer.wrap(baos.toByteArray());
                    this.bb.putInt(this.bb.capacity() - 4);
                    this.bb.rewind();
                } catch (IOException e) {
                    LOG.warn("Ignoring unexpected exception", e);
                }
            }
            this.watchRegistration = watchRegistration;
        }
相關文章
相關標籤/搜索