zookeeper底層經過NIO進行網絡傳輸,若是對NIO不是很熟悉,先參見java NIO。下面咱們來逐步實現基於NIO的zookeeper實現,首先咱們要定義應用層的通訊協議。html
傳輸協議java
請求協議格式:緩存
字段 | 長度 | 說明 | |
len | 4 | 請求長度,解決半包粘包 | |
請求頭 | xid | 4 | 請求在客戶端的id,用於惟一標識和保證響應順序 |
type | 4 | 請求類型code | |
請求體 | request | 不一樣的請求類型,有不一樣的請求體結構 |
響應協議格式:服務器
字段 | 長度 | 解釋 | |
len | 4 | 響應長度,解決半包粘包 | |
響應頭 | xid | 4 | 包序號,惟一標識一個包,經過該標識找到對應的客戶端Packet對象 |
zxid | 8 | 服務端事務處理id | |
err | 4 | 返回狀態code | |
響應體 | response | 不一樣的響應類型有不一樣的響應體 |
請求和響應體類型:網絡
List<String> childrensession
請求code | 請求頭類型 | 說明 | 請求格式 | 字段說明 | 響應格式 | 字段說明 |
0 | notification | |||||
1 | create | 建立節點 | String path | 路徑 | String path | 路徑 |
byte[] data | 節點值 | |||||
List<ACL> acl | acl值 | |||||
int flags | 節點類型 | |||||
2 | delete | 刪除節點 | String path | 路徑 | 無 | |
int version | 版本 | |||||
3 | exists | 是否存在指定節點 | String path | 路徑 | Stat stat | 節點狀態 |
boolean watch | 是否有watch | |||||
4 | getData | 獲取節點數據 | String path | 路徑 | Stat stat | 節點狀態 |
boolean watch | 是否有watch | byte[] data | 節點數據 | |||
5 | setData | 設置節點數據 | String path | 路徑 | Stat stat | 節點狀態 |
byte[] data | 數據 | |||||
int version | 版本 | |||||
6 | getACL | 獲取節點權限 | String path | 路徑 | List<ACL> acl | 權限 |
Stat stat | 節點狀態 | |||||
7 | setACL | 設置節點權限 | String path | 路徑 | Stat stat | 節點狀態 |
List<ACL> acl | 權限 | |||||
int version | 版本 | |||||
8 | getChildren | 獲取子節點 | String path | 路徑 | List<String> children | 子節點名 |
boolean watch | 是否有watch | |||||
9 | sync | 同步路徑 | String path | 路徑 | String path | 路徑 |
11 | ping | |||||
12 | getChildren2 | |||||
100 | auth | |||||
101 | setWatches | |||||
-10 | createSession | |||||
-11 | closeSession | |||||
-1 | error | |
1.將請求封裝成Packet對象放入outgoingQueue隊列中。併發
2.有一個收發送線程會從outgoingQueue隊列中取出一個可發送的Packet對象,併發送序列化信息,而後把該Packet放入到pendingQueue隊列中。socket
3.收發線程會接收服務端響應,反序列號出結果數據,而後在pendingQueue中找到對應的Packet,設置結果,將Packet放入到waitingEvents隊列中。ide
4.有一個事件線程,會不斷從waitingEvents隊列中取出一個Packet,並調用響應的callback方法。函數
發送packet包
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; synchronized (outgoingQueue) { //設置一個全局惟一的id,做爲數據包的id if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) { h.setXid(getXid()); } //將請求頭,請求體,返回結果,watcher等封裝成數據包。 packet = new Packet(h, r, request, response, null, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; //將數據包添加到outgoing隊列中。 outgoingQueue.add(packet); } sendThread.wakeup(); return packet; }
發送線程主流程(ClientCnxn.SendThread.run):
class SendThread extends Thread { SelectionKey sockKey; private final Selector selector = Selector.open(); public void run() { while (zooKeeper.state.isAlive()) { //創建鏈接 startConnect(); //獲取註冊通道 selector.select(1000); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { //創建鏈接 if (sc.finishConnect()) { primeConnection(k); } //讀寫數據 } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(); } } } try { selector.close(); } catch (IOException e) { LOG.warn("Ignoring exception during selector close", e); } } //經過nio創建鏈接 private void startConnect() throws IOException { //從服務器列表中獲取一臺服務器地址 InetSocketAddress addr = serverAddrs.get(nextAddrToTry); nextAddrToTry++; if (nextAddrToTry == serverAddrs.size()) { nextAddrToTry = 0; } //經過nio註冊 SocketChannel sock; sock = SocketChannel.open(); sock.configureBlocking(false); sock.socket().setSoLinger(false, -1); sock.socket().setTcpNoDelay(true); try { sockKey = sock.register(selector, SelectionKey.OP_CONNECT); } catch (IOException e) { sock.close(); throw e; } //初始化緩存 lenBuffer.clear(); incomingBuffer = lenBuffer; } }
處理讀寫主流程,主要是nio操做(ClientCnxn.SendThread.doIO):
boolean doIO() throws InterruptedException, IOException { boolean packetReceived = false; //獲取socketchannel SocketChannel sock = (SocketChannel) sockKey.channel(); //若是可讀 if (sockKey.isReadable()) { //讀取數據到緩存中 int rc = sock.read(incomingBuffer); //直到緩存讀滿 if (!incomingBuffer.hasRemaining()) { //重置緩存 incomingBuffer.flip(); //若是讀取的是長度信息,讀取長度信息,而且分配相應緩存 if (incomingBuffer == lenBuffer) { int len = incomingBuffer.getInt(); incomingBuffer = ByteBuffer.allocate(len); } else if (!initialized) { //若是是connect命令的返回值,獲取session,timeout等相關信息 readConnectResult(); enableRead(); lenBuffer.clear(); //重置緩存 incomingBuffer = lenBuffer; initialized = true; } else { //讀取數據內容 readResponse(); //重置緩存 lenBuffer.clear(); incomingBuffer = lenBuffer; packetReceived = true; } } } //若是是寫 if (sockKey.isWritable()) { synchronized (outgoingQueue) { if (!outgoingQueue.isEmpty()) { //從outgoingQueue隊列中拿數據包寫入通道 ByteBuffer pbb = outgoingQueue.getFirst().bb; sock.write(pbb); if (!pbb.hasRemaining()) { sentCount++; Packet p = outgoingQueue.removeFirst(); if (p.header != null && p.header.getType() != OpCode.ping && p.header.getType() != OpCode.auth) { pendingQueue.add(p); } } } } } if (outgoingQueue.isEmpty()) { disableWrite(); } else { enableWrite(); } return packetReceived; }
處理返回結果,xid=-2爲ping命令的返回結果;xid=-4爲auth命令;xid=-1爲服務器發送的notification;其餘命令返回結果。
void readResponse() throws IOException { //對返回數據進行反序列化 ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); //根據返回頭信息,封裝想要的事件,放入事件隊列中,交給eventthread處理 //向消息隊列放入驗證失敗消息 if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { zooKeeper.state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } return; } // if (replyHdr.getXid() == -1) { WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else event.setPath(serverPath.substring(chrootPath.length())); } WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; } //反序列化返回結果 Packet packet = null; synchronized (pendingQueue) { packet = pendingQueue.remove(); } try { packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } } finally { finishPacket(packet); } }
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
事件線程主要是處理回調函數(ClientCnxn.EventThread.run):
public void run() { try { isRunning = true; while (true) { //從隊列中獲取事件 Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { //處理事件 processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down"); }
處理回調函數和watcher
private void processEvent(Object event) { try { //處理watcher if (event instanceof WatcherSetEventPair) { WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else { // Packet p = (Packet) event; int rc = 0; String clientPath = p.clientPath; if (p.replyHeader.getErr() != 0) { rc = p.replyHeader.getErr(); } if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { StatCallback cb = (StatCallback) p.cb; if (rc == 0) { if (p.response instanceof ExistsResponse) { cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response) .getStat()); } else if (p.response instanceof SetDataResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response) .getStat()); } else if (p.response instanceof SetACLResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response) .getStat()); } } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetDataResponse) { DataCallback cb = (DataCallback) p.cb; GetDataResponse rsp = (GetDataResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getData(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetACLResponse) { ACLCallback cb = (ACLCallback) p.cb; GetACLResponse rsp = (GetACLResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getAcl(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetChildrenResponse) { ChildrenCallback cb = (ChildrenCallback) p.cb; GetChildrenResponse rsp = (GetChildrenResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren()); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetChildren2Response) { Children2Callback cb = (Children2Callback) p.cb; GetChildren2Response rsp = (GetChildren2Response) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof CreateResponse) { StringCallback cb = (StringCallback) p.cb; CreateResponse rsp = (CreateResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath() .substring(chrootPath.length()))); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.cb instanceof VoidCallback) { VoidCallback cb = (VoidCallback) p.cb; cb.processResult(rc, clientPath, p.ctx); } } } catch (Throwable t) { LOG.error("Caught unexpected throwable", t); } }
Packet結構
包,ClientCnxn內部管理請求內容的模塊。由如下幾個模塊組成:
1.RequestHeader header 請求頭
2.Record request 請求內容
3.ByteBuffer bb 實際須要發送的請求內容。
4.ReplyHeader replyHeader 響應頭
5.Record response 響應內容
6.String clientPath
7.String serverPath
8.boolean finished
9.AsyncCallback cb
10.Object ctx
11.WatchRegistration watchRegistration
Packet(RequestHeader header, ReplyHeader replyHeader, Record record, Record response, ByteBuffer bb, WatchRegistration watchRegistration) { this.header = header; this.replyHeader = replyHeader; this.request = record; this.response = response; if (bb != null) { this.bb = bb; } else { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive .getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later header.serialize(boa, "header"); if (record != null) { record.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } } this.watchRegistration = watchRegistration; }